我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,一个或多个消费者将一次获取一个任务,处理它并确认消息。
问题是,处理过程可能需要 10-20 分钟,而且我们当时没有回复消息,导致服务器与我们断开连接。
这是我们消费者的一些伪代码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
第一个任务完成后,BlockingConnection 内部深处的某个地方会抛出异常,抱怨套接字已重置。另外,RabbitMQ日志显示消费者因未及时响应而被断开连接(为什么它重置连接而不是发送FIN很奇怪,但我们不会担心这一点)。
我们进行了很多搜索,因为我们相信这是 RabbitMQ 的正常用例(有很多长时间运行的任务应该在许多消费者之间分配),但似乎没有其他人真正遇到过这个问题。最后我们偶然发现了一个线程,建议使用心跳并生成long_running_task()
在一个单独的线程中。
于是代码就变成了:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
这似乎可行,但非常混乱。我们是否确定ch
对象是线程安全的吗?另外,想象一下long_running_task()
正在使用该连接参数将任务添加到新队列(即这个长过程的第一部分已完成,让我们将任务发送到第二部分)。所以,线程正在使用connection
目的。该线程安全吗?
更重要的是,这样做的首选方式是什么?我觉得这非常混乱,而且可能不是线程安全的,所以也许我们做得不对。谢谢!