我目前正在利用芹菜来执行定期任务。我是芹菜新手。我有两个工作人员运行两个不同的队列。一种用于缓慢的后台作业,另一种用于用户在应用程序中排队的作业。
我正在 datadog 上监视我的任务,因为这是确认我的工作人员正常运行的简单方法。
我想要做的是在每个任务完成后,记录该任务在哪个队列上完成。
@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
statsd.increment("celery.on_task_publish.start.increment")
task = celery.tasks.get(sender)
queue_name = task.queue
statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])
以下功能是我在研究了 celery 文档和一些 StackOverflow 帖子后实现的,但它没有按预期工作。我得到第一个 statsd 增量,但其余代码不执行。
我想知道是否有一种更简单的方法来检查每个任务内部/完成后哪个队列处理了该任务。
既然你的问题说有没有办法在每个任务完成后检查内部/- 我假设你还没有尝试过这种芹菜结果后端的东西。所以你可以看看 Celery 本身提供的这个功能:Celery-Result-Backend / Task-result-Backend
。
它对于存储 celery 任务的结果非常有用。
通读此内容 =>https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings
一旦您了解如何设置此结果后端,请搜索result_extended
密钥(在同一链接中)能够添加queue-names
在您的任务返回值中。
可用选项数量 - 就像您可以将这些结果设置为转到其中任何一个:
Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc
我已经利用了这个Result-Backend
特征与Elasticsearch
这就是我的任务结果的存储方式:
只需添加一些配置即可settings.py
根据您的要求归档。非常适合我的应用程序。我有一个每周 cron 只清除successful results
任务 - 因为我们不再需要结果 - 我只能看到failed results
(如图所示)。
这些是我的要求的主要关键:task_track_started
and task_acks_late
随着result_backend
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)