我有 2 个自定义任务(TaskA
and TaskB
),两者都继承自celery.Task
。调度程序启动TaskA
时不时地,并且TaskA
发射N
times TaskB
每次都有不同的论据。但由于某种原因,有时是相同的TaskB
使用相同的参数,同时执行两次,这会导致数据库出现不同的问题。
class TaskA(celery.Task):
def run(self, *args, **kwargs):
objects = MyModel.objects.filter(processed=False)\
.values_list('id', flat=True)
task_b = TaskB()
for o in objects:
o.apply_async(args=[o, ])
class TaskB(celery.Task):
def run(self, obj_id, *args, **kwargs):
obj = MyModel.objects.get(id=obj_id)
# do some stuff with obj
我尝试过的事情
我尝试使用celery.group
希望它能解决这些问题,但我得到的只是错误,说run
需要 2 个参数,但没有提供任何参数。
这就是我尝试启动的方式TaskB
using celery.group
:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()
我也这样尝试过:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()
之前就在那里执行了任务g.apply_async()
.
Question
问题是出在我启动任务的方式上还是其他原因?这是正常行为吗?
附加信息
在我的本地机器上我运行celery 3.1.13
with RabbitMQ 3.3.4
,并在服务器上celery 3.1.13
与运行Redis 2.8.9
。
在本地计算机上我没有看到这样的行为,每个任务都执行一次。在服务器上,我看到 1 - 10 个此类任务连续执行两次。
这是我在本地计算机和服务器上运行 celery 的方式:
celery_beat: celery -A proj beat -l info
celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50
celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
可行的解决方法
我引入了一个锁TaskB
基于它收到的论据。经过大约 10 个小时的测试,我看到到底执行了两次,但锁可以防止数据库发生冲突。
这确实解决了我的问题,但我仍然想了解为什么会发生这种情况。