我是新使用芹菜,有一个问题。我有这个简单的任务:
@app.task(name='test_install_queue')
def test_install_queue():
return subprocess.call("exit 0",shell=True)
我稍后会在测试用例中调用此任务,例如
result = tasks.test_default_queue.apply_async(queue="install")
任务在队列中成功运行install
(因为我在芹菜日志中看到它,并且它完成得很好。但我想知道一种以编程方式查找任务在哪个队列中的方法test_install_queue
从存储在的对象中运行result
.
谢谢你!
EDIT:
我已将任务更改为:
@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
return self.request.__dict__
然后我使用的结果apply_async
如下:
result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
解决方法是工作线程(主机名)与工作线程中初始化的唯一队列具有相同的名称。
您可以尝试以下方法:
delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
if queue.exchange.name == delivery_info['exchange']
and queue.routing_key == delivery_info['routing_key']:
original_queue = queue.name
break
该方法建立在您使用默认 celery 设置并且交换是直接的假设之上。如果您需要更通用的扇出和主题交换解决方案,那么您将必须检查每个声明队列的路由键app.amqp.queues
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)