我想实现类似于选项 3 的 RabbitMQ 拓扑here https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493,除了一些差异:
新拓扑每天应该处理几千条消息。它应该有两个交换器:一个处理主队列(大约30个),另一个处理重试和错误队列(大约60个)。我一直在关注这个tutorial https://mac-blog.org.ua/rabbitmq-python-example/以及通常的 RMQ 教程,以及许多 SO 帖子。 RMQ 服务器在 Docker 容器中启动。
我面临的问题是消费者并未接收到所有消息,并且接收消息的顺序是意外的。我还看到同一条消息被拒绝两次。这是我的代码:
交换.py
def callback(self, channel, method, properties, body):
print("delivery_tag: {0}".format(method.delivery_tag))
data = json.loads(body)
routingKey = data.get('routing-key')
routingKey_dl_error = queues_dict[routingKey]['error']
print(" [X] Got {0}".format(body))
print(" [X] Received {0} (try: {1})".format(data.get('keyword'), int(properties.priority)+1))
# redirect faulty messages to *.error queues
if data.get('keyword') == 'FAIL':
channel.basic_publish(exchange='exchange.retry',
routing_key=routingKey_dl_error,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority),
timestamp=int(time.time()),
headers=properties.headers))
print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
time.sleep(5)
channel.basic_ack(delivery_tag=method.delivery_tag) #leaving this in creates 1000s of iterations(?!)
# check number of sent counts
else:
# redirect messages that exceed MAX_RETRIES to *.error queues
if properties.priority >= MAX_RETRIES - 1:
print(" [!] {0} Rejected after {1} retries".format(data.get('keyword'), int(properties.priority) + 1))
channel.basic_publish(exchange='exchange.retry',
routing_key=routingKey_dl_error,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority),
timestamp=int(time.time()),
headers=properties.headers))
print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
#channel.basic_ack(delivery_tag=method.delivery_tag)
else:
timestamp = time.time()
now = datetime.datetime.now()
expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())
# to reject job we create new one with other priority and expiration
channel.basic_publish(exchange='exchange_main',
routing_key=routingKey,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2,
priority=int(properties.priority) + 1,
timestamp=int(timestamp),
expiration=str(expire),
headers=properties.headers))
# send back acknowledgement about job
channel.basic_ack(delivery_tag=method.delivery_tag) # nack or reject???
print("[!] Rejected. Going to sleep for a while...")
time.sleep(5)
def exchange(self):
# 1 - connect and channel setup
parameters = "..."
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 2 - declare exchanges
# declares the main exchange to be used by all producers to send messages. External facing
channel.exchange_declare(exchange='exchange_main',
exchange_type='direct',
durable=True,
auto_delete=False)
# declares the dead letter exchange. Routes messages to *error and *retry queues. Internal use only
channel.exchange_declare(exchange='exchange.retry',
exchange_type='direct',
durable=True,
auto_delete=False)
# 3- bind the external facing exchange to the internal exchange
#channel.exchange_bind(destination='exchange.retry', source='exchange_main')
# 4 - declare queues
# Create durable queues bound to the exchange_main exchange
for queue_name in self.queueName_list:
queueArgs = {
"x-message-ttl": 5000,
"x-dead-letter-exchange": 'exchange.retry',
#"x-dead-letter-routing-key": queue_name + '.retry'
}
channel.queue_declare(queue=queue_name, durable=True, arguments=queueArgs)
# Create durable queues bound to the exchange.retry exchange
'''
for queue_dl_name in self.queueName_dl_list:
if queue_dl_name[-5:] == 'retry':
queueArgs_retry = {
"x-message-ttl": 5000,
"x-dead-letter-exchange": 'exchange_main',
"x-dead-letter-routing-key": queue_dl_name[:-6]
}
channel.queue_declare(queue=queue_dl_name, durable=True, arguments=queueArgs_retry)
else:
channel.queue_declare(queue=queue_dl_name, durable=True)
'''
for queue_dl_name in self.queueName_dl_list:
channel.queue_declare(queue=queue_dl_name, durable=True)
# 5 - bind retry and main queues to exchanges
# bind queues to exchanges. Allows for messages to be saved when no consumer is present
for queue_name in self.queueName_list:
channel.queue_bind(queue=queue_name, exchange='exchange_main')
for queue_dl_name in self.queueName_dl_list:
channel.queue_bind(queue=queue_dl_name, exchange='exchange.retry')
# 6 - don't dispatch a new message to worker until processed and acknowledged the previous one, dispatch to next worker instead
channel.basic_qos(prefetch_count=1)
# 7 - consume the message
all_queues = self.queueName_list + self.queueName_dl_list
for queue in all_queues:
channel.basic_consume(queue=queue,
on_message_callback=self.callback,
auto_ack=False)
print ('[*] Waiting for data for:')
for queue in all_queues:
print(' ' + queue)
print ('[*] To exit press CTRL+C')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
channel.close()
connection.close()
生产者.py
# 1 - connect and channel setup
parameters = "..."
try:
connection = pika.BlockingConnection(parameters)
except pika.exceptions.AMQPConnectionError as err:
print("AMQP connection failure. Ensure RMQ server is running.")
raise err
channel = connection.channel() # create a channel in TCP connection
# 2 - Turn on delivery confirmations (either a basic.ack or basic.nack)
channel.confirm_delivery()
# 3 - send message to rmq
print(" [*] Sending message to create a queue")
# set header parameters
count = 3
for i in range(1, count + 1):
if self.keyword is None:
message = "data {0}".format(i)
else:
message = self.keyword
timestamp = time.time()
now = datetime.datetime.now()
expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())
headers = dict()
headers['RMQ_Header_Key'] = self.queueName
headers['x-retry-count'] = 0
headers['x-death'] = None
data = {
'routing-key': self.queueName,
'keyword': message,
'domain': message,
'created': int(timestamp),
'expire': expire
}
# properties are often uses for bits of data that your code needs to have, but aren't part of the actual message body.
channel.basic_publish(
exchange='exchange_main',
routing_key=self.queueName,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2, # makes persistent job
priority=0, # default priority
timestamp=int(timestamp), # timestamp of job creation
expiration=str(expire), # job expiration
headers=headers
))
print(" [*] Sent message: {0} via routing key: {1}".format(message, self.queueName))
# 4 - close channel and connection
channel.close()
connection.close()
启动后exchange.py
,然后我从另一个终端窗口中的命令行发送:python3 producer.py queue1
然后得到:
delivery_tag: 1
[X] Got b'{"routing-key": "queue1", "keyword": "data 1", "domain": "data 1", "created": 1567068725, "expire": 47274000}'
[X] Received data 1 (try: 1)
[!] Rejected. Going to sleep for a while...
delivery_tag: 2
[X] Got b'{"routing-key": "queue1", "keyword": "data 1", "domain": "data 1", "created": 1567068725, "expire": 47274000}'
[X] Received data 1 (try: 2)
[!] Rejected. Going to sleep for a while...
delivery_tag: 3
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 1)
[!] Rejected. Going to sleep for a while...
delivery_tag: 4
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 2)
[!] Rejected. Going to sleep for a while...
delivery_tag: 5
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 3)
[!] data 3 Rejected after 3 retries
[*] Sent to error queue: queue1.error
delivery_tag: 6
[X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
[X] Received data 3 (try: 3)
[!] data 3 Rejected after 3 retries
[*] Sent to error queue: queue1.error
问题:
- 我的代码当前实现是否符合我所需的拓扑?
- 直接与主题:在这种情况下,直接交换是最优化/最有效的解决方案吗?
- 1 个交易所与 2 个交易所:是否建议坚持 2 个交易所,或者是否可以简化为仅用 1 个交易所来处理所有内容?
- 如何测试不正常的消息,即发送到回调函数的重试循环部分的消息?回调当前不处理“正常”消息(即没有触发重试或只是失败的消息)。
- 两个交易所有必要绑定吗?注释掉这段代码没有什么区别。
- 我需要实现参数(
channel.queue_declare
) 到死信和非死信队列?我知道非死信队列将声明参数,从而x-dead-letter-exchange
已设置,但我不确定是否x-dead-letter-routing-key
也应该设置。
- 我需要
ack
/ nack
每次发布消息时,因为我注意到实现和未实现时的行为不同(即,不发送 ACK,FAIL 消息不会发送 3 次,而只会发送两次,而 ack 则发送超过 3 次)
- 在上面的输出中,“数据1”只被消耗了两次,“数据2”根本没有出现,“数据3”到达了
MAX_RETRY
限制为 3 次,但随后被发送到 *.error 队列两次(不是一次),我觉得很奇怪。 RMQ在这里做什么?