如何检查处理 Celery 任务的队列

2023-12-10

我目前正在利用芹菜来执行定期任务。我是芹菜新手。我有两个工作人员运行两个不同的队列。一种用于缓慢的后台作业,另一种用于用户在应用程序中排队的作业。

我正在 datadog 上监视我的任务,因为这是确认我的工作人员正常运行的简单方法。

我想要做的是在每个任务完成后,记录该任务在哪个队列上完成。

@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
    statsd.increment("celery.on_task_publish.start.increment")

    task = celery.tasks.get(sender)
    queue_name = task.queue

    statsd.increment("celery.on_task_publish.increment", tags=[f"{queue_name}:{task}"])

以下功能是我在研究了 celery 文档和一些 StackOverflow 帖子后实现的,但它没有按预期工作。我得到第一个 statsd 增量,但其余代码不执行。

我想知道是否有一种更简单的方法来检查每个任务内部/完成后哪个队列处理了该任务。


既然你的问题说有没有办法在每个任务完成后检查内部/- 我假设你还没有尝试过这种芹菜结果后端的东西。所以你可以看看 Celery 本身提供的这个功能:Celery-Result-Backend / Task-result-Backend。 它对于存储 celery 任务的结果非常有用。 通读此内容 =>https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings


一旦您了解如何设置此结果后端,请搜索result_extended密钥(在同一链接中)能够添加queue-names在您的任务返回值中。

可用选项数量 - 就像您可以将这些结果设置为转到其中任何一个:

Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc 

我已经利用了这个Result-Backend特征与Elasticsearch这就是我的任务结果的存储方式:

enter image description here

只需添加一些配置即可settings.py根据您的要求归档。非常适合我的应用程序。我有一个每周 cron 只清除successful results任务 - 因为我们不再需要结果 - 我只能看到failed results (如图所示)。

这些是我的要求的主要关键:task_track_started and task_acks_late随着result_backend

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何检查处理 Celery 任务的队列 的相关文章

  • Kivy - 有所有颜色名称的列表吗?

    在 Kivy 中 小部件 color属性允许输入其值作为字符串颜色名称 也 例如在 kv file Label color red 是否有所有可能的颜色名称的列表 就在这里 来自Kivy 的文档 https kivy org doc sta
  • Python 中的流式传输管道

    我正在尝试使用 Python 将 vmstat 的输出转换为 CSV 文件 因此我使用类似的方法转换为 CSV 并将日期和时间添加为列 vmstat 5 python myscript py gt gt vmstat log 我遇到的问题是
  • Python3.0 - 标记化和取消标记化

    我正在使用类似于以下简化脚本的内容来解析较大文件中的 python 片段 import io import tokenize src foo bar src bytes src encode src io BytesIO src src l
  • 在没有模型的情况下将自定义页面添加到 django admin

    我正在尝试在没有模型关联的情况下向管理员添加自定义页面 这就是我迄今为止所取得的成就 class MyCustomAdmin AdminSite def get urls self from django conf urls import
  • 搜索多个字段

    我想我没有正确理解 django haystack 我有一个包含多个字段的数据模型 我希望搜索其中两个字段 class UserProfile models Model user models ForeignKey User unique
  • 如何在 Python 中加密并在 Java 中解密?

    我正在尝试在 Python 程序中加密一些数据并将其保存 然后在 Java 程序中解密该数据 在Python中 我像这样加密它 from Crypto Cipher import AES KEY 1234567890123456789012
  • 从扫描文档中提取行表 opencv python

    我想从扫描的表中提取信息并将其存储为 csv 现在我的表提取算法执行以下步骤 应用倾斜校正 应用高斯滤波器进行去噪 使用 Otsu 阈值进行二值化 进行形态学开局 Canny 边缘检测 进行霍夫变换以获得表格行 去除重复行 10像素范围内相
  • 在 Windows 上使用 apache mod_wsgi 运行 Flask 应用程序时导入冲突

    我允许您询问我在 Windows 上使用您的 mod wsgi portage 托管 Flask 应用程序时遇到的问题 我有两个烧瓶应用程序 由于导入冲突 只有一个可以同时存在 IE 如果请求申请 1 我有回复 然后 如果我请求应用程序 2
  • Django 中所有应用程序的基本模板

    我有一个包含 2 个应用程序的项目 project blog templates index html polls templates index html project templates base html index html 现在
  • 小组芹菜链任务

    shared task def process record x return 1 2 4 4 5 6 shared task def add pro id return pro id 10 shared task def dmap it
  • 如何在 pandas 中使用 read_fwf 跳过空行?

    I use pandas read fwf http pandas pydata org pandas docs stable generated pandas read fwf htmlPython pandas 0 19 2 中的函数读
  • ANTLR 获取并拆分词法分析器内容

    首先 对我的英语感到抱歉 我还在学习 我为我的框架编写 Python 模块 用于解析 CSS 文件 我尝试了 regex ply python 词法分析器和解析器 但我发现自己在 ANTLR 中 第一次尝试 我需要解析 CSS 文件中的注释
  • 返回表示每组内最大值的索引的一系列数字位置

    考虑一下这个系列 np random seed 3 1415 s pd Series np random rand 100 pd MultiIndex from product list ABDCE list abcde One Two T
  • 在 keras 中保存和加载权重

    我试图从我训练过的模型中保存和加载权重 我用来保存模型的代码是 TensorBoard log dir output model fit generator image a b gen batch size steps per epoch
  • 通过纱线安装 bootstrap 的 Rails 找不到字体

    我有一个带 Bootstrap 的 Rails 5 应用程序 我用纱线安装了它 我做了以下事情 yarn add bootstrap bootstrap 3 3 7 version 3 3 7 resolved https registry
  • Mac OSX 10.6 上的 Python mysqldb 不工作

    我正在使用 Python 2 7 并尝试让 Django 项目在 MySQL 后端运行 我已经下载了 mysqldb 并按照此处的指南进行操作 http cd34 com blog programming python mysql pyth
  • Elasticsearch 通过搜索返回拼音标记

    我用语音分析插件 https www elastic co guide en elasticsearch plugins current analysis phonetic html由于语音转换 从弹性搜索中进行一些字符串匹配 我的问题是
  • 如何与其他用户一起使用 pyenv?

    如何与其他用户一起使用 pyenv 例如 如果我在用户 test 的环境中安装了 pyenv 则当我以 test 身份登录时可以使用 pyenv 但是 当我以其他用户 例如 root 身份登录时如何使用 pyenv 即使你这么做了 我也会s
  • 双击打开 ipython 笔记本

    相关文章 通过双击 osx 打开 ipython 笔记本 https stackoverflow com questions 16158893 open an ipython notebook via double click on osx
  • TKinter 中的禁用/启用按钮

    我正在尝试制作一个像开关一样的按钮 所以如果我单击禁用按钮 它将禁用 按钮 有效 如果我再次按下它 它将再次启用它 我尝试了 if else 之类的东西 但没有成功 这是一个例子 from tkinter import fenster Tk

随机推荐

  • 如何找出我的输出从哪里开始?

    我注意到我的网站源代码中的 HTML 上方有两行空行 它看起来像这样 HTML 是使用 PHP 生成的 如何找出输出从哪里开始 我尝试添加 after 希望触发类似 无法修改标题 输出从行 开始 之类的错误 但没有发生错误 如果你想获取输出
  • php 会话无法正常工作

    您好 我在从一个页面到另一个页面保持会话时遇到问题 代码在我以前运行 php5 的服务器上工作 但在我最近的服务器上工作 我想知道这是否是一个错误 有任何想法吗 会议在第一页举行 但不在第二页举行 如果您错过了 请务必执行以下操作sessi
  • Python 的“Extras”和“site-packages”目录有什么区别?

    我对 OS X 上的 Python 使用 Extras 和 site packages 中的包的方式感到困惑 特别是 我对在这些目录中看到的内容 目录中的重复包如何相互影响以及应该将安装的包放在哪里感到困惑 I had assumed th
  • 词法闭包如何工作? [复制]

    这个问题在这里已经有答案了 当我研究 Javascript 代码中的词法闭包问题时 我在 Python 中遇到了这个问题 flist for i in xrange 3 def func x return x i flist append
  • 使用 Spring Data REST 时如何更改 Jacksons 配置?

    我正在尝试将 Jackson 配置为以 ISO 8601 格式显示 JSR 310 瞬间 Configuration class Jackson Bean static ObjectMapper objectMapper ObjectMap
  • 空手道:将变量作为查询参数从一个特征文件传递到另一个特征文件[重复]

    这个问题在这里已经有答案了 我正在尝试执行一个功能文件 并且需要从另一个功能获取查询参数 这是功能文件 1 该文件调用了两个特征文件 第一个功能文件在数据库中添加一条记录并返回 json 响应 我必须使用响应中的属性 并且必须传递给另一个功
  • YouTube API - onPlayerStateChange

    我将 YouTube API 与 Cyclone Slider 结合使用 目标是在 YouTube 开始播放后暂停幻灯片放映 我正在使用以下代码 效果很好
  • 自定义 URLClassLoader,运行时出现 NoClassDefFoundError

    我已经创建了自己的URLClassLoader 并将其设置为系统类加载器java system class loader 它已初始化且一切正常 但找不到我尝试加载的类 这是URLClassLoader public class Librar
  • 无法在 Cordova 3.4 中使用异步方法。第一次调用插件方法后不会调用 Onsuccess

    以下代码无法正常工作 public class TestPlugin extends CordovaPlugin public static CallbackContext callbackContext class TestRun imp
  • Windows Phone 短信无需确认

    是否有可能在不使用启动器任务的情况下发送短信 Windows Phone 7 x 中的 SmsComposeTask 像紧急短信这样的应用程序似乎可以做到这一点 这意味着可以在没有用户确认的情况下发送短信 不可以 未经用户确认 无法发送短信
  • 为什么 Scala 编译器不允许使用默认参数的重载方法?

    虽然在某些有效情况下 此类方法重载可能会变得不明确 但为什么编译器不允许在编译时和运行时都不是不明确的代码呢 Example This fails def foo a String b Int 42 a b def foo a Int b
  • Mongoose 填充返回空数组

    我正在尝试使用 mongoose populate 函数 但作为响应 我得到了空数组 我已经看到了多个关于此的帖子 var MerchantSchema new mongoose Schema packages type mongoose
  • iPhone 应用程序:我可以通过网站中的链接打开应用程序吗?

    我是一名 iPhone 开发新手 正在编写一个应用程序 该应用程序将成为网站的 移动版本 我想知道是否可以从网站中的链接启动我的应用程序 例如 有人在 iPhone Safari 中进入我们的网站 单击链接 我们的应用程序就会启动 那可能吗
  • Android - ListView 中的标题消失了?

    我最近遇到了一个非常奇怪的错误 而且根本不知道该怎么办 我有一个 Tabbed Fragment Activity 这意味着我需要在底部有一个 tabhost 所以我使用了 google 的 API 示例 它通过 TabHost 和管理器
  • 如何为多列显示字段使用自定义格式?

    有谁知道扩展其 正常 用途的方法 它是id默认情况下 可以轻松更改为field1 我有一个显示字段集如下 this gt displayField name desc 显示为Name Description在模板文件中调用时 我知道这是一个
  • 由于停靠面板,无法从顶部调整无边框 winform 的大小

    我制作了一个无边框的Windows窗体 添加了阴影 拖动的能力 以及从任何角度调整其大小的能力 这就是我所做的 有点乱 using System Runtime InteropServices public class MAIN DllIm
  • 在python opencv中通过网络发送实时视频帧

    我正在尝试将我用相机捕获的实时视频帧发送到服务器并处理它们 我使用 opencv 进行图像处理 使用 python 进行语言处理 这是我的代码 client cv py import cv2 import numpy as np impor
  • com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:“where 子句”中的未知列“Smith”

    希望你能帮我 我收到错误 com mysql jdbc exceptions jdbc4 MySQLSyntaxErrorException 未知 where 子句 中的 Smith 列 我有一个名为 aspirante 的数据库 它有一个
  • ApplicationRoute 的加载状态

    使用 余烬1 7 0 我有一些服务器端数据 希望在转换任何路由之前将其加载到我的 ember 应用程序中 我的多个 但不是全部 其他路线 控制器需要此数据 我想我可以在 ApplicationRoute 中加载这些数据model方法 它工作
  • 如何检查处理 Celery 任务的队列

    我目前正在利用芹菜来执行定期任务 我是芹菜新手 我有两个工作人员运行两个不同的队列 一种用于缓慢的后台作业 另一种用于用户在应用程序中排队的作业 我正在 datadog 上监视我的任务 因为这是确认我的工作人员正常运行的简单方法 我想要做的