如何在不阻塞主线程的情况下动态创建redis worker?

dauxcl2d  于 2021-06-08  发布在  Redis
关注(0)|答案(1)|浏览(438)

我想有一个队列工作者管理工具,它允许添加新的队列,并将作业注册到这些队列中,生成工作者来处理这些作业。
到目前为止我有这个代码:

from redis import Redis
from rq import Queue, Retry, Worker

class WorkerPool: # TODO: find a better name
    def __init__(self):
        self._queues = {}
        self._workers = []
        self._redis_conn = Redis()

    def _get_queue(self, name):
        try:
            return self._queues[name]
        except KeyError:
            new_queue = Queue(name, connection=self._redis_conn)
            self._queues[name] = new_queue

            new_worker = Worker([new_queue], connection=self._redis_conn, name=name)
            new_worker.work() # Blocking :(

            return new_queue

    def add_job(self, queue, func, *func_args):
        q = self._get_queue(queue)
        job = q.enqueue(func, *func_args, retry=Retry(max=3))
        return job

正如我们所看到的 work() 函数阻止执行,而我希望它在后台工作。我想我可以在这里创建另一个线程-然后调用 work() 从一个线程,而主线程返回的工作,然而,这似乎有点尴尬我。有内置的吗 Redis (或其他已知模块)解决方案?
ps,欢迎给我的班级起更好的名字:)
这是我对多处理it的看法(由于来自非法线程的信号,线程将无法工作):

import multiprocessing as mp
from redis import Redis
from rq import Queue, Retry, Worker

class WorkerPool: # TODO: find a better name
    def __init__(self):
        self._queues = {}
        self._worker_procs = []
        self._redis_conn = Redis()

    def __del__(self):
        for proc in self._worker_procs:
            proc.kill()

    def _get_queue(self, name):
        try:
            return self._queues[name]
        except KeyError:
            new_queue = Queue(name, connection=self._redis_conn)
            self._queues[name] = new_queue

            new_worker = Worker([new_queue], connection=self._redis_conn, name=name)
            worker_process = mp.Process(target=new_worker.work)
            worker_process.start()
            self._worker_procs.append(worker_process)

            return new_queue

    def add_job(self, queue, func, *func_args):
        q = self._get_queue(queue)
        job = q.enqueue(func, *func_args, retry=Retry(max=3))
        return job

不知道这有多好,但它似乎做了我现在想要的

nom7f22z

nom7f22z1#

如果您只需要小规模的多处理,绑定到一个主进程,所有这些都在一台机器上运行,请查看多处理模块和 concurrent.futures 模块及其应用 Pool 以及 ProcessPoolExecutor 物体。除非您有特定的要求,否则最好使用 Pool 或者 ProcessPoolExecutor 而不是启动 Process 手动创建对象(在这种情况下,redis可能会也可能不会杀伤力过大。)
如果您的需求规模更大,需要跨多台机器的工作人员,那么就有一整套用于运行这些工作人员的软件;rabbitmq是一种广为人知的技术,但它只是几种技术中的一种,每种技术都有自己的优缺点。每个云提供商(如果你在云中)也有自己的功能。您可能想了解几种现成解决方案的特性,确定哪一种是好的匹配方案,然后进行设置。
也就是说,我在过去实现了一个基于redis的定制排队系统;有时你真的需要一些现有解决方案都没有提供的东西。在这种情况下,设计将受到您所需要的特性的严重影响。在我看来,这是细粒度的优先级。。。

相关问题