我想有一个队列工作者管理工具,它允许添加新的队列,并将作业注册到这些队列中,生成工作者来处理这些作业。
到目前为止我有这个代码:
from redis import Redis
from rq import Queue, Retry, Worker
class WorkerPool: # TODO: find a better name
def __init__(self):
self._queues = {}
self._workers = []
self._redis_conn = Redis()
def _get_queue(self, name):
try:
return self._queues[name]
except KeyError:
new_queue = Queue(name, connection=self._redis_conn)
self._queues[name] = new_queue
new_worker = Worker([new_queue], connection=self._redis_conn, name=name)
new_worker.work() # Blocking :(
return new_queue
def add_job(self, queue, func, *func_args):
q = self._get_queue(queue)
job = q.enqueue(func, *func_args, retry=Retry(max=3))
return job
正如我们所看到的 work()
函数阻止执行,而我希望它在后台工作。我想我可以在这里创建另一个线程-然后调用 work()
从一个线程,而主线程返回的工作,然而,这似乎有点尴尬我。有内置的吗 Redis
(或其他已知模块)解决方案?
ps,欢迎给我的班级起更好的名字:)
这是我对多处理it的看法(由于来自非法线程的信号,线程将无法工作):
import multiprocessing as mp
from redis import Redis
from rq import Queue, Retry, Worker
class WorkerPool: # TODO: find a better name
def __init__(self):
self._queues = {}
self._worker_procs = []
self._redis_conn = Redis()
def __del__(self):
for proc in self._worker_procs:
proc.kill()
def _get_queue(self, name):
try:
return self._queues[name]
except KeyError:
new_queue = Queue(name, connection=self._redis_conn)
self._queues[name] = new_queue
new_worker = Worker([new_queue], connection=self._redis_conn, name=name)
worker_process = mp.Process(target=new_worker.work)
worker_process.start()
self._worker_procs.append(worker_process)
return new_queue
def add_job(self, queue, func, *func_args):
q = self._get_queue(queue)
job = q.enqueue(func, *func_args, retry=Retry(max=3))
return job
不知道这有多好,但它似乎做了我现在想要的
1条答案
按热度按时间nom7f22z1#
如果您只需要小规模的多处理,绑定到一个主进程,所有这些都在一台机器上运行,请查看多处理模块和
concurrent.futures
模块及其应用Pool
以及ProcessPoolExecutor
物体。除非您有特定的要求,否则最好使用Pool
或者ProcessPoolExecutor
而不是启动Process
手动创建对象(在这种情况下,redis可能会也可能不会杀伤力过大。)如果您的需求规模更大,需要跨多台机器的工作人员,那么就有一整套用于运行这些工作人员的软件;rabbitmq是一种广为人知的技术,但它只是几种技术中的一种,每种技术都有自己的优缺点。每个云提供商(如果你在云中)也有自己的功能。您可能想了解几种现成解决方案的特性,确定哪一种是好的匹配方案,然后进行设置。
也就是说,我在过去实现了一个基于redis的定制排队系统;有时你真的需要一些现有解决方案都没有提供的东西。在这种情况下,设计将受到您所需要的特性的严重影响。在我看来,这是细粒度的优先级。。。