如何在 celery 内为每个用户生成队列?

2024-03-04

因此,我尝试将 Web 请求中的阻塞内容移至后台任务并利用队列。我对消息传递和发布/订阅也很陌生。用户将数据推送到那里并进行处理,稍后用户会收到相关通知。我为此做了一个 celery 设置,发现它不能满足我为每个用户分配自己的任务的专用队列的用例。

我尝试指定创建丢失的队列以及在工作进程生成期间(发送以逗号分隔的队列名称),并将它们列在队列设置中,如互联网上之前的答案中所述“使用芹菜动态创建队列”。它创建队列,但当我指定与设置和命令行中指定名称不同的队列名称时,它不会创建队列。解决方案是生成更多具有队列名称的工作程序,但这不能满足用例,因为将有数百万个数据处理请求。

我发现 python-rq 有 Queue 对象初始化及其名称,我认为它创建了新队列。如果是这样,转向 RQ 是否正确?

redis_conn = Redis()
q = Queue('some_queue', connection=redis_conn)

我想要的是每个用户在后台队列自己的任务。我没有在网上看到任何在 celery 中创建动态队列的解决方案(无需在命令行中指定队列名称或在设置中指定)。 python-rq 似乎有这个解决方案。我的权衡是从 RabbitMQ 和 celry 迁移到 Redis。

有没有办法真正在 celery 中执行每个用户队列?如果是,请列出步骤。或者这种设计模式就是不正确? pubsub 会满足用例吗?


Celery Worker 仅从以下定义的队列中消费task_queues https://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_queues设置或在命令行上给出-Q https://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-q选项。但是,这可以通过命令行或代码动态更改。只要确保有task_create_missing_queues https://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_create_missing_queues在设置中启用(这是默认设置),以便当您开始使用新队列时会自动创建新队列。

因此,在为给定用户发送任务之前,您必须指示工作人员开始从用户的队列中消费。这可以通过使用命令行来实现add_consumer https://docs.celeryproject.org/en/latest/userguide/workers.html#std:control-add_consumer控制命令,或使用代码app.control.add_consumer() https://docs.celeryproject.org/en/latest/reference/celery.app.control.html#celery.app.control.Control.add_consumer方法。这些操作是幂等的,因此如果工作线程已经从队列中消耗数据,则不会发生任何事情。如果队列尚不存在,则会自动创建。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 celery 内为每个用户生成队列? 的相关文章

随机推荐