在看了很多关于chord回调不执行的文章并尝试了他们的解决方案后,我仍然无法让它工作。事实上,chord_unlock方法也因为某种原因没有被执行。
celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('sophie',
broker='redis://localhost:6379/2',
backend='redis://localhost:6379/2',
include=['sophie.lib.chord_test'])
app.conf.update(
CELERY_ACCEPT_CONTENT=["json"],
CELERY_TASK_SERIALIZER="json",
CELERY_TRACK_STARTED=True,
CELERYD_PREFETCH_MULTIPLIER=1, # NO PREFETCHING OF TASKS
BROKER_TRANSPORT_OPTIONS = {
'priority_steps': [0, 1] # ALLOW ONLY 2 TASK PRIORITIES
}
)
if __name__ == '__main__':
app.start()
字符串
chord_test.py
from __future__ import absolute_import
from sophie.celery import app
from celery import chord
@app.task(name='sophie.lib.add')
def add(x, y):
return x + y
@app.task(name='sophie.lib.tsum')
def tsum(numbers):
return sum(numbers)
if __name__ == '__main__':
tasks = [add.s(100, 100), add.s(200, 200)]
chord(tasks, tsum.s()).apply_async()
型
我的worker日志文件的输出如下所示
$ celery worker -l info --app=sophie.celery -n worker1.%h
-------------- [email protected] v3.1.6 (Cipater)
---- **** -----
--- * *** * -- Linux-3.0.0-12-server-x86_64-with-Ubuntu-11.10-oneiric
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> broker: redis://localhost:6379/2
- ** ---------- .> app: sophie:0x3554250
- ** ---------- .> concurrency: 1 (prefork)
- *** --- * --- .> events: OFF (enable -E to monitor this worker)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. sophie.lib.add
. sophie.lib.tsum
[2013-12-12 19:37:26,499: INFO/MainProcess] Connected to redis://localhost:6379/2
[2013-12-12 19:37:26,506: INFO/MainProcess] mingle: searching for neighbors
[2013-12-12 19:37:27,512: INFO/MainProcess] mingle: all alone
[2013-12-12 19:37:27,527: WARNING/MainProcess] [email protected] ready.
[2013-12-12 19:37:29,723: INFO/MainProcess] Received task: sophie.lib.add[b7d504c1-217f-43a9-b57e-86f0fcbdbe22]
[2013-12-12 19:37:29,734: INFO/MainProcess] Task sophie.lib.add[b7d504c1-217f-43a9-b57e-86f0fcbdbe22] succeeded in 0.009769904
00522s: 200
[2013-12-12 19:37:29,735: INFO/MainProcess] Received task: sophie.lib.add[eb01a73e-f6c8-401d-8049-6cdbc5f0bd90]
[2013-12-12 19:37:29,737: INFO/MainProcess] Task sophie.lib.add[eb01a73e-f6c8-401d-8049-6cdbc5f0bd90] succeeded in 0.001446505
00442s: 400
型
根本没有调用chord_unlock。更多的输出可以给予更多的上下文:
$ sudo pip freeze | egrep 'celery|kombu|billiard'
billiard==3.3.0.12
celery==3.1.6
kombu==3.0.7
$ uname -a
Linux vagrant-ubuntu-11 3.0.0-12-server #20-Ubuntu SMP Fri Oct 7 16:36:30 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
$ redis-server --version
Redis server version 2.2.11 (00000000:0)
型
4条答案
按热度按时间ffx8fchx1#
chords文档使用以下chord语法:
字符串
所以你可以做:
型
smdncfj32#
在chords文档下面有一个重要的注意事项部分,它说:
在chord中使用的任务必须 * 不 * 忽略其结果。实际上,这意味着您必须启用CELERY_RESULT_SUPEND才能使用chord。此外,如果在配置中将CELERY_IGNORE_RESULT设置为True,确保chord中使用的各个任务都是用ignore_result=False定义的。这适用于Task子类和修饰任务。
即使我遵循了建议,和弦回调也不会执行,但可能会帮助其他人。
编辑:
在我的例子中找到了它!chord并不完全支持在所有情况下指定队列名称:https://github.com/celery/celery/issues/2085
slwdgvem3#
当我的任务签名与流提供的分段不匹配时,接受
*args
可能有助于调试此类问题。jtoj6r0c4#
对我来说,回调函数从未执行过,因为celery结果过期时间设置得很低,所以除了其他评论中提到的
CELERY_RESULT_BACKEND
和ignore_result=False
之外,您应该检查此设置以防CELERY_RESULT_EXPIRES
文档链接:https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-result_expires