Celery / RabbitMQ - 找出 No Acks - 未确认的消息

2023-12-20

我正在尝试找出如何获取有关未确认消息的信息。这些存储在哪里?在使用 celery 检查时,似乎一旦消息得到确认,它就会处理完毕,并且您可以跟踪状态。假设您有一个结果后端,那么您可以看到它的结果。但从你应用延迟的那一刻起,直到它被承认它处于黑洞中。

  1. noAcks 存储在哪里?
  2. 我如何知道 noAcks 列表的“深度”?换句话说,有多少个以及我的任务在列表中的位置。

虽然与我正在处理的问题并不完全相关。

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved()

# or "ACTIVE" jobs.. 
data = inspect.active()

# or "REVOKED" jobs.. 
data = inspect.revoked()

# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()

# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??

# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))

我非常感谢您对此的建议和帮助。


它们是那些任务inspect.reserved()具有'acknowleged': False

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# those that have been sent to a worker and are thus reserved
# from being sent to another worker, but may or may not be acknowledged as received by that worker
data = inspect.reserved()

{'celery.tasks': [{'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '09d5b726-269e-48d0-8b0e-86472d795906',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None}],
 'fastlane.tasks': [],
 'images.tasks': [],
 'mailer.tasks': []}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Celery / RabbitMQ - 找出 No Acks - 未确认的消息 的相关文章

  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • 如何让 Celery 工作人员返回任务结果

    我有一个调用任务的烧瓶应用程序 该任务从数据库中提取数据 绘制折线图并返回在 html 页面上呈现的 html 内容 如果没有 Celery Flask 应用程序可以正常工作并在客户端呈现折线图 但现在我想委托 celery 通过以下方式运
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • django redis celery 和 celerybeats 的正确设置

    我一直在尝试设置 django celery redis celery beats 但它给我带来了麻烦 文档非常简单 但是当我运行 django 服务器 redis celery 和 celerybeats 时 没有打印或记录任何内容 我所
  • python celery - 导入错误:没有名为 _curses 的模块 - 尝试运行 manage.py celeryev 时

    背景 视窗 7 x 64 Python 2 7 姜戈1 4 Celery 与 Redis 捆绑包 在尝试运行 manage py celeryev 时 我在终端中收到以下错误 import curses File c Python2 lib
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • 用 Chronos 取代 Celerybeat

    成熟到什么程度Chronos http airbnb github io chronos 它是像 celery beat 这样的调度程序的可行替代品吗 现在 我们的调度实现了一个定期的 心跳 任务 该任务检查 未完成的 事件 并在过期时触发
  • Erl 无法连接到本地 EPMD。为什么?

    Erlang R14B04 erts 5 8 5 source 64 bit rq 1 async threads 0 kernel poll false Eshell V5 8 5 abort with G root ip 10 101
  • 更改 RabbitMQ 队列中的参数

    我有一个 RabbitMQ 队列 最初声明如下 var result channel QueueDeclare NewQueue true false false null 我正在尝试添加死信交换 因此我将代码更改为 channel Exc
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass
  • RabbitMQ 管理插件窗口呈现为空白页面

    I have installed Erlang RabbitMQ and configured the management plugin as per the instructions on the website https www r
  • 防止 Celery Beat 运行相同的任务

    我有一个计划的 celery 每 30 秒运行一次任务 我有一个每天作为任务运行 另一个每周在用户指定的时间和一周中的某一天运行 它检查 开始时间 和 下一个预定日期 在任务完成之前 下一个计划日期不会更新 但是 我想知道如何确保 cele
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • 将 sensu-client 连接到服务器时 AMQP 连接的 bad_header

    我已经安装了 sensu 和厨师社区食谱 但是 sensu客户端无法连接到服务器 导致rabbitmq连接错误 尝试连接时消息超时 这是详细的客户端日志 来自 sensu client log 的日志 timestamp 2014 07 0
  • RabbitMQ:无法启动rabbitmq_management插件

    Version gt sudo rabbitmqctl status grep rabbit RabbitMQ rabbit RabbitMQ 3 5 6 Error gt sudo rabbitmq plugins enable rabb
  • rabbitmq 的 REST API

    有没有办法从 ajax 向 RabbitMQ 发送数据 我的应用程序由数千个 Web 客户端 用 js 编写 和 WCF REST 服务组成 现在我试图弄清楚如何为我的应用程序创建可扩展点 这个想法是有一个rabbitmq实例 它从放置在一
  • 从 Java/Spring 检索 RabbitMQ 队列中未确认消息的数量

    有没有办法返回未确认的消息数 我正在使用此代码来获取队列中的消息数 DeclareOk declareOk amqpAdmin getRabbitTemplate execute new ChannelCallback
  • 如何将返回列表的 Celery 任务链接到一个组中?

    我想从 Celery 任务返回的列表创建一个组 以便对于任务结果集中的每一项 一个任务将添加到该组中 这是一个简单的代码示例来解释用例 这 应该是上一个任务的结果 celery task def get list amount In rea
  • Django Celery:管理界面显示零任务/工作人员

    我已经将 Celery 设置为 Django ORM 作为后端 试图监视幕后发生的事情 我已经开始了celeryd带 E 标志python manage py celeryd E l INFO v 1 f path to celeryd l
  • 公共交通错误队列正在消耗,但仍然不为空

    我正在使用 Mastransit 3 5 0 和 RabbitMq 如果队列消费者抛出异常 则默认由 MoveExceptionToTransportFilter 处理异常并移至 error 队列 对于 error 队列 我有单独的消费者

随机推荐

  • SQL LIKE 列值加通配符 DB2

    我试图在 LIKE 子句中使用列和通配符 我期待这能起作用 WHERE COLUMNA LIKE COLUMNB DB2 支持此功能吗 应该能够使用定位语法 http publib boulder ibm com infocenter dz
  • Mac OS 10.9 上的 Bluecove

    我正在尝试使用 bluecove 将我的 Android 设备连接到 PC 它在 Windows 上运行良好 但在 MAC OS 10 9 Mavericks 上出现以下问题 dyld lazy symbol binding failed
  • 如何获取存储在默认架构表中的租户标识符?

    我正在努力使用 hibernate 在基于 spring 的应用程序中启用多租户 我创建了一个自定义实现CurrentTenantIdentifierResolver并覆盖了resolveCurrentTenantIdentifier 确定
  • 如果文件不存在则创建一个

    我正在尝试打开一个文件 如果该文件不存在 我需要创建它并打开它进行写入 到目前为止我有这个 open file for reading fn input Enter file to open fh open fn r if file doe
  • 如何在 ubuntu 上卸载 Ruby?

    如何卸载 Ruby 1 9 2dev 2010 07 02 i486 linux 在ubuntu上 需要重新安装 请帮忙 从终端运行以下命令 sudo apt get purge ruby 通常对我来说效果很好 注意 这会删除与 GRUB
  • 使用 Media Foundation 对 Direct X 表面进行编码

    我正在尝试使用 MediaFoundation API 对视频进行编码 但在将示例推送到 SinkWriter 时遇到问题 我正在通过桌面复制 API 获取要编码的帧 我最终得到的是一个包含桌面图像的 ID3D11Texture2D 我正在
  • Visual Studio 添加数据连接 - 字典中不存在给定的键

    我已经阅读了之前的几个类似问题 但似乎没有一个提供解决方案 所以我再问一次 我正在使用 Visual Studio 并尝试连接到服务器资源管理器中的数据库 无论我尝试连接哪个数据库 它都会出现 字典中不存在给定的密钥 错误 我尝试过使用 S
  • 在 Windows 上安装 GLo​​g

    我需要安装 GLo g 才能在 OpenCV 中使用 SFM 模块 我已经找到了源代码链接 https github com google glog但我真的不知道如何在我的系统上安装它 因为我没有太多的经验 还有一个 自述文件 window
  • NDB 查询返回零结果。数据存储显示结果

    我发现这个特殊的问题 运行查询 确认记录存在 返回计数为零 这是我的模型 class Description ndb Model description ndb TextProperty time posted ndb DateTimePr
  • 将 OpenCV 矩阵循环转换为 JavaCV

    不久前得到了 O Reilly 的 学习 OpenCV 一书 从那时起 我一直忙于将我看到的所有示例代码从 OpenCV 转换为 JavaCV 通常还会进行一些我自己的修改 一直以来 我都试图尽可能保持纯 OpenCV C 语言 代码并避免
  • 有没有安全的方法来管理 API 密钥?

    我正在使用一个API https www themoviedb org在我的应用程序中 我目前通过 java 界面管理 API 密钥 public interface APIContract The API KEY MUST NOT BE
  • 函数的 va_arg 参数存储在堆栈内存还是堆内存中?

    问题如题 变量参数列表是根据其使用情况存储在堆栈内存中 还是分配到堆中 更重要的是它们存储在哪里 为什么它们存储在 X 中 该标准没有提及实现 只提及 va arg 和相关 宏 的行为 它们甚至不必是宏 根据编译器的 正常 约定 变量参数可
  • Jenkins 在 Websphere 8.5 上运行导致插件类加载错误

    我正在运行 Jenkins 1 552 WebSphere 8 5 with Java 7 on RedHat Linux 我将 Artifactory 插件加载到 Jenkins 中 但是当我进行测试连接时 收到以下错误 org apac
  • MediaPlayer 无法准备?

    我编写了一个流媒体广播应用程序 其功能相当基本 但有些设备拒绝 准备 例如运行 2 2 的我的 Optimus One 我的应用程序在其他 2 2 设备上运行 我兄弟的 S2 运行 2 3 我的 Prime 运行 4 0 3 知道为什么我的
  • 如何抑制 Jasmine 中跳过的测试的输出

    如果我的场景包含 1000 多个测试 并且只想运行其中的选定部分 我可以使用fdescribe 其余的测试被跳过 这很好 但它们仍然污染控制台输出 如何抑制跳过测试的控制台输出 如果您通过 Karma 运行测试 则可以配置一个规范报告器插件
  • 如何在 pandas 中使用滚动?

    我正在研究下面的代码 Resample interpolate and inspect ozone data here data data resample D interpolate data info Create the rollin
  • Wix 如何隐藏功能选项(无子功能)

    有一个类似的问题 在自定义对话框中编辑上下文菜单 选择树 https stackoverflow com questions 12929930 edit context menu selectiontree in customize dia
  • 创建 NSString 后保留计数

    我正在通过以下方法创建一个 NSString 类型的对象 NSString str NSString alloc initWithString aaaaaaaaaaaaaaa NSLog retain count d str retainC
  • Rails 4+ 最佳实践:删除父级,同时保留子级

    我想保留子记录和层次结构 即使父记录被删除 我看到两个选择 保留现有的父级并利用 deleted at 字段来指示 父母本身不活跃 但关系仍然存在 这将导致许多实际上已失效的父记录被永久存储 嗯 将所有废弃的子记录分配给通用 收集器 僵尸父
  • Celery / RabbitMQ - 找出 No Acks - 未确认的消息

    我正在尝试找出如何获取有关未确认消息的信息 这些存储在哪里 在使用 celery 检查时 似乎一旦消息得到确认 它就会处理完毕 并且您可以跟踪状态 假设您有一个结果后端 那么您可以看到它的结果 但从你应用延迟的那一刻起 直到它被承认它处于黑