在 pika / RabbitMQ 中处理长时间运行的任务

2023-12-20

我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取一个任务,处理它并确认消息。

问题是,处理过程可能需要 10-20 分钟,而且我们当时没有回复消息,导致服务器与我们断开连接。

这是我们消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

第一个任务完成后,BlockingConnection 内部深处的某个地方会抛出异常,抱怨套接字已重置。另外,RabbitMQ日志显示消费者因未及时响应而被断开连接(为什么它重置连接而不是发送FIN很奇怪,但我们不会担心这一点)。

我们进行了很多搜索,因为我们相信这是 RabbitMQ 的正常用例(有很多长时间运行的任务应该在许多消费者之间分配),但似乎没有其他人真正遇到过这个问题。最后我们偶然发现了一个线程,建议使用心跳并生成long_running_task()在一个单独的线程中。

于是代码就变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

这似乎可行,但非常混乱。我们是否确定ch对象是线程安全的吗?另外,想象一下long_running_task()正在使用该连接参数将任务添加到新队列(即这个长过程的第一部分已完成,让我们将任务发送到第二部分)。所以,线程正在使用connection目的。该线程安全吗?

更重要的是,这样做的首选方式是什么?我觉得这非常混乱,而且可能不是线程安全的,所以也许我们做得不对。谢谢!


目前,您最好的选择是关闭心跳,如果您阻塞时间过长,这将阻止 RabbitMQ 关闭连接。我正在尝试 pika 的核心连接管理和 IO 循环在后台线程中运行,但它不够稳定,无法发布。

In 鼠兔 v1.1.0 https://pika.readthedocs.io/en/stable/modules/parameters.html这是ConnectionParameters(heartbeat=0)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 pika / RabbitMQ 中处理长时间运行的任务 的相关文章

  • Symfony Messenger 和 Mailer:如何添加绑定密钥?

    我有一个正在运行的 Symfony 4 4 项目 其中包含Messenger和rabbitMQ 我有一个带有 2 个队列的异步传输 transports https symfony com doc current messenger htm
  • 在使用 FromEventPattern 订阅之前捕获事件

    我正在使用 Rx 框架编写消息监听器 我面临的问题是 我正在使用的库使用一个消费者 每当消息到达时就会发布事件 我已经设法通过以下方式消费传入的消息Observable FromEventPattern但我对服务器中已有的消息有疑问 目前我
  • 组在 RabbitMQ 中接收消息,最好使用 Spring AMQP?

    我正在从服务 S 接收消息 该服务将每个单独的属性更改作为单独的消息发布到实体 一个人为的例子是这样的实体 Person id 123 name Something address 如果姓名和地址在同一交易中更新 则 S 将发布两条消息 P
  • 没有连接的 AMQP/RabbitMQ 通道什么时候会死亡?

    我有一个简单的 RabbitMQ 测试程序 随机将消息排队 另一个读取它们 所有这些都使用 Spring AMQP 如果消费者死亡 例如 在没有机会关闭其连接或通道的情况下终止进程 则它尚未确认的任何消息似乎将永远保持未确认状态 我看过很多
  • 如何在 celery 内为每个用户生成队列?

    因此 我尝试将 Web 请求中的阻塞内容移至后台任务并利用队列 我对消息传递和发布 订阅也很陌生 用户将数据推送到那里并进行处理 稍后用户会收到相关通知 我为此做了一个 celery 设置 发现它不能满足我为每个用户分配自己的任务的专用队列
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 何时使用 RabbitMQ 铲子以及何时使用 Federation 插件?

    对于我工作的公司 我们希望使用 RabbitMQ 作为我们的主要消息总线 我们的想法是 每个应用程序都使用自己的虚拟主机进行内部通信 并且通过 shovel 或联合插件 我们可以在多个虚拟主机 甚至可能是多台机器 非集群 之间共享某些类型的
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • Django、RabbitMQ 和 Celery - 为什么在我更新开发中的 Django 代码后,Celery 会运行旧版本的任务?

    所以我有一个 Django 应用程序 它偶尔会向 Celery 发送任务以进行异步执行 我发现 当我在开发中处理代码时 Django 开发服务器知道如何自动检测代码何时发生更改 然后重新启动服务器 以便我可以看到我的更改 然而 我的应用程序
  • 如何在多租户系统中的 RabbitMQ 中使队列私有/安全?

    我已阅读开始使用 http www rabbitmq com getstarted htmlRabbitMQ 提供的指南 甚至还贡献了第六个示例暴风雨 amqp https github com paolo losi stormed amq
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • 定义具有多种消息类型的消息传递域

    到目前为止 我见过的大多数 F 消息传递示例都使用 2 4 种消息类型 并且能够利用模式匹配将每条消息定向到其正确的处理函数 对于我的应用程序 由于处理和所需参数的不同性质 我需要数百种独特的消息类型 到目前为止 每个消息类型都是其自己的记
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • RabbitMQ - 如何死信/处理过期队列中的消息?

    我有一个队列x expires放 我遇到的问题是我需要对队列中的消息进行进一步处理IF队列过期 我最初的想法是设置x dead letter exchange在队列中 但是 当队列过期时 消息就会消失而不会进入死信交换 如何处理死信或以其他
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接

随机推荐

  • 拥有字符串映射如何将其与给定字符串进行比较

    我们有像 name location 这样的字符串对的映射 unix 就像绝对位置 a lamyfolder 我们得到了一些位置a lamyfolder mysubfolder myfile 如何找到哪个地图位置最适合给定的网址 例如我们有
  • 通过 DispatchGroup 与 DispatchQueue 访问主队列

    我在一个在后台线程上运行的类中使用 DispatchGroup 偶尔需要更新UI 所以调用如下代码 dispatchGroup notify queue main self delegate moveTo sender self locat
  • 在Spring Boot MVC中添加ShallowEtagHeaderFilter

    我正在尝试调整我的应用程序配置以设置 ETag 支持 我刚刚检查过this https stackoverflow com questions 26151057 add a servlet filter in a spring boot a
  • 如何更改Xamarin菜单栏中的后退按钮?

    这就是我所拥有的 这就是我想要得到的 如果导航堆栈中没有页面 则标题图标将位于左上角 否则将有后退箭头和 后退 文本 我没有找到任何自定义它的选项 有可能吗 如果您使用的话 您可以将箭头更改为汉堡包图标MasterPage在导航页面内 De
  • 如何允许外部访问私有 Azure DevOps NuGet 源

    情况如下 DevOps Org A维护私有 NuGet 提要 DevOps Org B需要在其 Pipelines 中使用上述 feed 中的包 目前的解决方案包括 添加用户U from Org B作为客人Org A具有利益相关者角色的 D
  • 恒等函数在哪里以及为什么有用?

    我明白为什么函数组合很重要 它允许从小而简单的函数构建大而复杂的函数 val f A gt B val g B gt C val h f andThen g compose f and g 该成分符合identity and 关联性 law
  • 产品图片不显示 (Woocommerce)

    我的产品图片出现 但当我点击进入产品页面时 图片被隐藏 只有当我点击时才会出现 某些产品会在其他浏览器上显示 某些产品仅在 Microsoft Edge 上显示 Edit The real problem to that was cloud
  • 替换 PHPUnit 方法 `withConsecutive` (在 PHPUnit 10 中废弃)

    作为方法withConsecutive将在 PHPUnit 10 中删除 在 9 6 中已弃用 我需要将此方法的所有出现替换为新代码 尝试寻找一些解决方案 但没有找到任何合理的解决方案 例如 我有一个代码 this gt personSer
  • Django-graphene 同一模型有多种类型

    我有一个相当大的graphene djangoAPI 为两个应用程序提供支持 我限制对某些字段的访问的第一个方法是拥有多个DjangoObjectTypes对于同一型号 并使用fields限制每种类型可以访问哪些字段 示例Organizat
  • 节点应用程序 docker 映像在本地运行并在 Amazon ECS 上失败

    该应用程序可以在本地正常部署和运行很长一段时间 没有出现任何问题 然而 在 Amazon ECS 上 它似乎总是在空闲运行大约 2 30 分钟后崩溃 怎么了 Dockerfile Set the node alpine base image
  • 计算 SPARQL 中的个体数量

    我对 SPARQL 完全陌生 我想计算这个本体中的参与者数量 http data linkedmdb org directory actor http data linkedmdb org directory actor 我尝试了以下方法
  • 为什么 Mercurial 合并时很笨?如何使拉取/合并更改变得更简单?

    我刚刚开始使用 Mercurial 我想我正在尝试做一些非常简单的事情 一些应该非常典型的事情 但我很困惑为什么它如此复杂 以及为什么它不能按应有的方式工作 国际海事组织 我与朋友共享一些存储库 他做了一些更改并检查了几个文件并推送它们 现
  • 如何在asp.net mvc中回发后清除字段?

    我想知道如何在 ASP NET MVC 回发后清除字段 就像现在 当发生验证错误时 字段会保留用户输入的内容 不过 这很好 当没有发生验证错误时 我希望清除所有字段并显示一条消息 所以现在我使用 ViewData 成功显示 但不确定如何清除
  • Kotlin 多平台:JobCancellationException:父作业已完成

    我尝试编写一个使用 ktor 的 kotlin 多平台库 android 和 ios 因此 我在 kotlins 协程方面遇到了一些问题 When writing tests I always get kotlinx coroutines
  • 通过构建管道将 ASP.NET 应用程序部署到 Azure 应用服务

    我继承了一个 ASP NET 4 7 2 应用程序 它在我的计算机上成功运行和启动 我现在尝试通过 Azure DevOps Pipeline 将其部署到 Azure 应用服务 为了尝试做到这一点 我创建了一个 Azure 构建管道 其中包
  • mod_rewrite 在 URL 中带有尾随句点

    我的 Apache 上有一个 RewriteRule 以使 URL 变得友好 RewriteRule log script php u 1 QSA 这使得http example com log 用户名 http example com l
  • gzipped Parquet 文件在 HDFS for Spark 中可拆分吗?

    在互联网上搜索和阅读有关此主题的答案时 我收到了令人困惑的消息 有人可以分享他们的经验吗 我知道 gzipped csv 不是这样的事实 但也许 Parquet 的文件内部结构是这样的 Parquet 与 csv 的情况完全不同 使用 GZ
  • 通过斯坦福解析器提取所有名词、形容词形式和文本

    我试图通过斯坦福解析器从给定文本中提取所有名词和形容词 我当前的尝试是在 Tree Object 的 getChildrenAsList 中使用模式匹配来定位以下内容 NN paper NN algorithm NN information
  • 从 vscode 的集成终端中打开新的集成终端的命令是什么?

    我正在尝试设置一系列任务 每个任务都需要一个终端 为此 我需要使用一个命令从以前的集成终端打开一个新的集成终端选项卡 有没有办法在vs code集成终端中做到这一点 在 mac 中我会使用 open a Terminal 或者类似的东西tt
  • 在 pika / RabbitMQ 中处理长时间运行的任务

    我们正在尝试建立一个基本的定向队列系统 其中生产者将生成多个任务 一个或多个消费者将一次获取一个任务 处理它并确认消息 问题是 处理过程可能需要 10 20 分钟 而且我们当时没有回复消息 导致服务器与我们断开连接 这是我们消费者的一些伪代