我正在运行 Django、Celery 和 RabbitMQ。我想要实现的是确保与一个用户相关的任务按顺序执行(具体来说,一次执行一个,我不希望每个用户执行任务并发)
- 每当为用户添加新任务时,它应该取决于最近添加的任务。如果此类型的任务已为此用户排队且尚未启动,则附加功能可能包括不将任务添加到队列中。
我做了一些研究并且:
- 我找不到一种方法将新创建的任务与 Celery 本身已排队的任务链接起来,链似乎只能链接新任务。
- 我认为这两个功能都可以使用自定义 RabbitMQ 消息处理程序来实现,尽管毕竟可能很难编码。
- 我还读过有关 celery-tasktree 的内容,这可能是确保执行顺序的最简单方法,但是如何将新任务与已经的“
applied_async
“task_tree 或队列?有什么方法可以使用这个包实现额外的无重复功能吗?
编辑:还有这个“锁定”示例芹菜食谱 http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html由于这个概念很好,我看不到一种可能的方法来使其按照我的情况按预期工作 - 简单地说,如果我无法为用户获取锁,则必须重试任务,但这意味着将其推送到队列末尾。
这里最好的行动方案是什么?
如果您配置 celery 工作线程,使其一次只能执行一项任务(请参阅工人并发 http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-concurrency设置),然后您可以在每个用户的基础上强制执行所需的并发性。使用类似的方法
NUMBER_OF_CELERY_WORKERS = 10
def get_task_queue_for_user(user):
return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS)
根据用户id获取任务队列,每个用户的每个任务都会被分配到同一个队列中。工作人员需要配置为仅使用单个任务队列中的任务。
它会像这样发挥:
-
用户49触发任务
-
任务发送至user_queue_9
-
当唯一一个正在听的芹菜工人user_queue_9
准备好消费一个新任务,该任务被执行
但这是一个很奇怪的答案,因为
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)