Pika SelectConnection 适配器的 close() 方法不会关闭连接

2024-06-22

我有一个简单的 AMQP/RabbitMQ 异步消费者,使用 Pika 库用 Python 编写,并基于异步消费者示例 http://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html来自皮卡文档。主要区别是我想在线程中运行我的线程,并且希望它正确关闭连接,然后在一定时间间隔后退出(即终止线程)。以下是我打开连接和设置超时的方法。我还打开一个通道,创建一个交换并绑定一个队列......所有这些都工作正常。

def connect(self):
  LOGGER.info('OPEN connection...')
  return pika.SelectConnection(self._parameters, self.on_connection_open, stop_ioloop_on_close=False)

def on_connection_open(self, unused_connection):
  LOGGER.info('Connection opened')
  self.add_on_connection_close_callback()
  self._connection.add_timeout(5, self.timer_tick)
  self.open_recv_channel()

这是超时回调:

def timer_tick(self):
  LOGGER.info('---TICK---')
  self._stop()

这是 _stop 方法:

def _stop(self):
  LOGGER.info('Stopping...')
  self._connection.close()
  LOGGER.info('Stopped')
  time.sleep(5)
  self._connection.ioloop.stop()

这是启动线程的 run 方法:

def run(self):
  print "-Run Started-"
  self._connection = self.connect()
  self._connection.ioloop.start()
  print "-Run Finished-"

这是 main() 的主要部分:

client = TestClient()
client.start()
client.join()
LOGGER.info('Returned.')
time.sleep(30)

我的问题是“self._connection.close()”无法正常工作。我添加了一个 on_close 回调:

self._connection.add_on_close_callback(self.on_connection_closed)

但 on_connection_close() 永远不会被调用。此外,连接未关闭。我可以在 RabbitMQ 管理 Web 界面中看到它,甚至在线程结束后它仍然存在。这是输出:

-Run Started-
2015-01-28 14:39:28,431: OPEN connection...
2015-01-28 14:39:28,491: Queue bound
(...[snipped] various other messages here...)
2015-01-28 14:39:28,491: Issuing consumer related RPC commands
2015-01-28 14:39:28,491: Adding consumer cancellation callback
(Pause here waiting for timeout callback)
2015-01-28 14:39:33,505: ---TICK---
2015-01-28 14:39:33,505: Stopping...
2015-01-28 14:39:33,505: Closing connection (200): Normal shutdown
2015-01-28 14:39:33,505: Stopped
-Run Finished-
2015-01-28 14:39:39,507: Returned.

“关闭连接(200):正常关闭”来自 Pika,但是我的 on_close 或 on_cancel 回调都没有被调用,无论我是从关闭通道开始,还是只是关闭连接。唯一有效的是用“basic_cancel”停止消费者,这会导致我的“on_cancel_callback”被调用。

我想在主程序中使用循环来创建和销毁消费者线程,但目前,每次运行一个循环时,都会留下一个孤立的连接,因此我的连接数会无限期地增加。当程序关闭时,连接就会消失。

使用 connection.close() 应该可以工作:皮卡文档 http://pika.readthedocs.org/en/latest/modules/adapters/select.html:

close(reply_code=200,reply_text='正常关闭')

断开与 RabbitMQ 的连接。如果有任何打开的通道,它将在完全断开连接之前尝试关闭它们。具有活动消费者的通道将尝试向 RabbitMQ 发送 Basic.Cancel,以在关闭通道之前彻底停止消息的传递。


如果您在线程之间共享连接,这可能会导致问题。pika不是线程安全的,连接不应该被不同的线程使用。

第一点 http://pika.readthedocs.org/en/latest/faq.html常见问题解答:

Q:

Pika 线程安全吗?

A:

Pika 的代码中没有任何线程的概念。如果您想通过线程使用 Pika,请确保每个线程都有一个在该线程中创建的 Pika 连接。跨线程共享一个 Pika 连接是不安全的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pika SelectConnection 适配器的 close() 方法不会关闭连接 的相关文章

  • Python __init__ * 参数 [重复]

    这个问题在这里已经有答案了 我对 Python 还很陌生 我想使用这个库 但是 该类的构造函数中有一个参数 我找不到任何相关信息 init方法如下所示 def init self ain1 ain2 bin1 bin2 microsteps
  • Django表单中的隐藏字段不在cleaned_data中

    我有这个表格 class CollaboratorForm forms Form user forms CharField label Username max length 100 canvas forms IntegerField wi
  • 从 java 代码运行 Python 脚本

    这是我第一次在java中尝试python 我正在尝试从我的代码执行 python 脚本 如下所示 Process process Runtime getRuntime exec python C Users username Desktop
  • 获取父类名? [复制]

    这个问题在这里已经有答案了 class A object def get class self return self class class B A def init self A init self b B print b get cl
  • Django CollectStatic 启动大文件上传时管道损坏

    我正在尝试使用collectstatic将静态文件上传到我的S3存储桶 但我收到一个700k javascript文件的管道损坏错误 这就是错误 Copying Users wedonia work asociados server aso
  • 为什么Flask后台线程获取错误的数据库信息?

    为了将实时数据库信息推送到客户端 我在服务器端使用flask socketio 通过使用websocket将所有实时数据库信息推送到客户端 我的视图文件有一个片段 from models import Host from flask soc
  • 使用 pytherejs 嵌入小部件:错误的视角和相机观察

    我在用pythreejs可视化一些 3D 模型 在 Jupyter 笔记本上可视化模型时 一切都按预期进行 但是当尝试将小部件嵌入 HTML 文档时 我面临两个问题 看起来相机在加载时正在查看 0 0 0 而不是预期的那样 一旦您与小部件交
  • 如何为 PyYAML 编写代表程序?

    我想要一个自定义函数来序列化任意 python 对象 就像 json dump 函数有一个名为 default 的可选参数 如果对象不是 json 可序列化的 它应该是 json 转储器将调用的函数 我只是想从 json 包中执行相当于此操
  • 将 postgres 连接到 django 时遇到问题

    以下文档来自Django Postgres 文档 https docs djangoproject com en 4 1 ref databases postgresql notes我添加到我的settings py 在我设置的设置中 DA
  • python osmnx - 仅提取一个国家的大型高速公路

    我知道可以通过 OSMNX python 包提取城市的道路网络 详情请参阅https geoffboeing com 2016 11 osmnx python street networks https geoffboeing com 20
  • 将 gtk.DrawingArea 保存到文件

    我想使用 PIL 将 gtk DrawingArea 对象内容保存到 jpeg 文件 我特别想添加这个脚本 http pygstdocs berlios de pygst tutorial webcam viewer html制作照片的可能
  • python请求ssl握手失败

    每次我尝试这样做 requests get https url 我收到这条消息 import requests gt gt gt requests get https reviews gethuman com companies Trace
  • numpy.polyval() 的反函数

    我想知道 np polyval 是否有一个方便的反函数 我在其中给出 y 值并求解 x 我知道我可以做到这一点的一种方法是 import numpy as np Set up the question p np array 1 1 10 y
  • Python - 从一定范围内随机采样,同时避免某些值

    我一直在阅读有关random sample 函数在random模块 但没有看到任何可以解决我的问题的东西 我知道使用random sample range 1 100 5 会给我来自 人群 的 5 个独特样本 我想得到一个随机数range
  • 抓取 Shopee API v4

    我有一个最终项目 其中我想要检索的数据是通过在shopee上抓取数据来获取的 但是当我在隐藏的API上抓取shopee时遇到问题 当我在Insomnia脚本上尝试时 脚本会运行 但是当我尝试时在本地或 google colab 脚本上 这是
  • 如何使用 opencv python 根据检测到的物体的位置生成其热图

    我需要根据对象的位置生成其热图 示例 视频帧中检测到的绿色球 如果它长时间停留在某个位置 那么该位置应该是红色的 并且球在短时间内经过的帧中的位置必须是蓝色的 这样我就需要生成热图 提前致谢 那么你在这里可以做的是 1 首先定义一个热图作为
  • tkinter 库 treectrl 转换为 exe 安装程序时出现 cx_freeze 错误

    我使用的是 python 版本 3 7 我使用了这个名为 treectrl 的外部库 当我运行 py 文件时它工作得很好 但是当我使用 cx freeze 转换为 exe 文件时 它给了我错误 NomodulleFound 名为 tkint
  • 导入错误:无法导入名称

    我有一个名为 google translate python 的库 https github com terryyin google translate python https github com terryyin google tra
  • 如何限制单元测试的最大运行时间?

    我目前正在运行一些单元测试 这些测试可能需要很长时间才能失败或无限期地运行 在成功的测试运行中 它们总是会在一定的时间内完成 是否可以创建一个 pytest 单元测试 如果在一定时间内未完成 该测试就会失败 您可以安装 pytest tim
  • 无法将 类型的对象转换为张量

    我正在编写一个使用 Flask 框架的客户端 python 文件 并在 docker 机器中运行它 因此 这需要一个输入文件并生成它的输出 但它会抛出无法转换为张量的错误 tf app flags DEFINE string server

随机推荐