我正在使用 django 1.4、celery 3.0、rabbitmq
为了描述该问题,我的系统中有许多内容网络,并且我需要一个队列来处理与每个网络相关的任务。
然而,内容是在系统运行时动态创建的,因此我需要动态创建队列并让现有工作人员开始处理它们。
我尝试通过以下方式安排任务(其中内容是 django 模型实例):
queue_name = 'content.{}'.format(content.pk)
# E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
add_content.apply_async(args=[content], queue=queue_name)
这将创建一个名为的队列content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
,创建一个名为 name 的新交换content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
和路由密钥content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
并将任务发送到该队列。
然而,我从未见过工人们承担起这些任务。我当前设置的工作人员不会侦听任何特定队列(未使用队列名称初始化),并且会很好地拾取发送到默认队列的任务。我的芹菜设置是:
BROKER_URL = "amqp://test:password@localhost:5672/vhost"
CELERY_TIMEZONE = 'UTC'
CELERY_ALWAYS_EAGER = False
from kombu import Exchange, Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue(CELERY_DEFAULT_QUEUE, Exchange(CELERY_DEFAULT_EXCHANGE),
routing_key=CELERY_DEFAULT_ROUTING_KEY),
)
CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1
知道如何让工作人员接收发送到这个新创建的队列的任务吗?