RabbitMQ 中的消息序列出现意外行为

2024-02-07

我想实现类似于选项 3 的 RabbitMQ 拓扑here https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493,除了一些差异:

新拓扑每天应该处理几千条消息。它应该有两个交换器:一个处理主队列(大约30个),另一个处理重试和错误队列(大约60个)。我一直在关注这个tutorial https://mac-blog.org.ua/rabbitmq-python-example/以及通常的 RMQ 教程,以及许多 SO 帖子。 RMQ 服务器在 Docker 容器中启动。

我面临的问题是消费者并未接收到所有消息,并且接收消息的顺序是意外的。我还看到同一条消息被拒绝两次。这是我的代码:

交换.py

def callback(self, channel, method, properties, body):
    print("delivery_tag: {0}".format(method.delivery_tag))
    data = json.loads(body)
    routingKey = data.get('routing-key')
    routingKey_dl_error = queues_dict[routingKey]['error']
    print(" [X] Got {0}".format(body))
    print(" [X] Received {0} (try: {1})".format(data.get('keyword'), int(properties.priority)+1))

    # redirect faulty messages to *.error queues
    if data.get('keyword') == 'FAIL':
        channel.basic_publish(exchange='exchange.retry',
                              routing_key=routingKey_dl_error,
                              body=json.dumps(data),
                              properties=pika.BasicProperties(
                                  delivery_mode=2,
                                  priority=int(properties.priority),
                                  timestamp=int(time.time()),
                                  headers=properties.headers))

        print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
        time.sleep(5)
        channel.basic_ack(delivery_tag=method.delivery_tag) #leaving this in creates 1000s of iterations(?!)

    # check number of sent counts
    else:
        # redirect messages that exceed MAX_RETRIES to *.error queues
        if properties.priority >= MAX_RETRIES - 1:
            print(" [!] {0} Rejected after {1} retries".format(data.get('keyword'), int(properties.priority) + 1))
            channel.basic_publish(exchange='exchange.retry',
                                  routing_key=routingKey_dl_error,
                                  body=json.dumps(data),
                                  properties=pika.BasicProperties(
                                      delivery_mode=2,
                                      priority=int(properties.priority),
                                      timestamp=int(time.time()),
                                      headers=properties.headers))
            print(" [*] Sent to error queue: {0}".format(routingKey_dl_error))
            #channel.basic_ack(delivery_tag=method.delivery_tag)
        else:
            timestamp = time.time()
            now = datetime.datetime.now()
            expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())

            # to reject job we create new one with other priority and expiration
            channel.basic_publish(exchange='exchange_main',
                                  routing_key=routingKey,
                                  body=json.dumps(data),
                                  properties=pika.BasicProperties(
                                      delivery_mode=2,
                                      priority=int(properties.priority) + 1,
                                      timestamp=int(timestamp),
                                      expiration=str(expire),
                                      headers=properties.headers))

            # send back acknowledgement about job
            channel.basic_ack(delivery_tag=method.delivery_tag) # nack or reject???
            print("[!] Rejected. Going to sleep for a while...")
            time.sleep(5)



def exchange(self):
    # 1 - connect and channel setup
    parameters = "..."
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()

    # 2 - declare exchanges
    # declares the main exchange to be used by all producers to send messages. External facing
    channel.exchange_declare(exchange='exchange_main',
                             exchange_type='direct',
                             durable=True,
                             auto_delete=False)

    # declares the dead letter exchange. Routes messages to *error and *retry queues. Internal use only
    channel.exchange_declare(exchange='exchange.retry',
                             exchange_type='direct',
                             durable=True,
                             auto_delete=False)

    # 3- bind the external facing exchange to the internal exchange
    #channel.exchange_bind(destination='exchange.retry', source='exchange_main')

    # 4 - declare queues
    # Create durable queues bound to the exchange_main exchange
    for queue_name in self.queueName_list:
        queueArgs = {
            "x-message-ttl": 5000,
            "x-dead-letter-exchange": 'exchange.retry',
            #"x-dead-letter-routing-key": queue_name + '.retry'
        }
        channel.queue_declare(queue=queue_name, durable=True, arguments=queueArgs)

    # Create durable queues bound to the exchange.retry exchange
    '''
    for queue_dl_name in self.queueName_dl_list:
        if queue_dl_name[-5:] == 'retry':
            queueArgs_retry = {
                "x-message-ttl": 5000,
                "x-dead-letter-exchange": 'exchange_main',
                "x-dead-letter-routing-key": queue_dl_name[:-6]
            }
            channel.queue_declare(queue=queue_dl_name, durable=True, arguments=queueArgs_retry)
        else:
            channel.queue_declare(queue=queue_dl_name, durable=True)
    '''
    for queue_dl_name in self.queueName_dl_list:
        channel.queue_declare(queue=queue_dl_name, durable=True)

    # 5 - bind retry and main queues to exchanges
    # bind queues to exchanges. Allows for messages to be saved when no consumer is present
    for queue_name in self.queueName_list:
        channel.queue_bind(queue=queue_name, exchange='exchange_main')

    for queue_dl_name in self.queueName_dl_list:
        channel.queue_bind(queue=queue_dl_name, exchange='exchange.retry')

    # 6 - don't dispatch a new message to worker until processed and acknowledged the previous one, dispatch to next worker instead
    channel.basic_qos(prefetch_count=1)

    # 7 - consume the message
    all_queues = self.queueName_list + self.queueName_dl_list
    for queue in all_queues:
        channel.basic_consume(queue=queue,
                              on_message_callback=self.callback,
                              auto_ack=False)


    print ('[*] Waiting for data for:')
    for queue in all_queues:
        print(' ' + queue)
    print ('[*] To exit press CTRL+C')

    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()

    channel.close()
    connection.close()

生产者.py

    # 1 - connect and channel setup                                            
    parameters = "..."

    try:
        connection = pika.BlockingConnection(parameters)
    except pika.exceptions.AMQPConnectionError as err:
        print("AMQP connection failure. Ensure RMQ server is running.")
        raise err

    channel = connection.channel() # create a channel in TCP connection

    # 2 - Turn on delivery confirmations (either a basic.ack or basic.nack)
    channel.confirm_delivery()

    # 3 - send message to rmq
    print(" [*] Sending message to create a queue")
    # set header parameters
    count = 3
    for i in range(1, count + 1):
        if self.keyword is None:
            message = "data {0}".format(i)
        else:
            message = self.keyword
        timestamp = time.time()
        now = datetime.datetime.now()
        expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())

        headers = dict()
        headers['RMQ_Header_Key'] = self.queueName
        headers['x-retry-count'] = 0
        headers['x-death'] = None

        data = {
            'routing-key': self.queueName,
            'keyword': message,
            'domain': message,
            'created': int(timestamp),
            'expire': expire
        }

        # properties are often uses for bits of data that your code needs to have, but aren't part of the actual message body.
        channel.basic_publish(
                              exchange='exchange_main', 
                              routing_key=self.queueName, 
                              body=json.dumps(data),
                              properties=pika.BasicProperties(
                                delivery_mode=2,  # makes persistent job
                                priority=0,  # default priority
                                timestamp=int(timestamp),  # timestamp of job creation
                                expiration=str(expire),  # job expiration
                                headers=headers
                              ))

        print(" [*] Sent message: {0} via routing key: {1}".format(message, self.queueName))

    # 4 - close channel and connection
    channel.close()
    connection.close()

启动后exchange.py,然后我从另一个终端窗口中的命令行发送:python3 producer.py queue1

然后得到:

delivery_tag: 1
 [X] Got b'{"routing-key": "queue1", "keyword": "data 1", "domain": "data 1", "created": 1567068725, "expire": 47274000}'
 [X] Received data 1 (try: 1)
[!] Rejected. Going to sleep for a while...
delivery_tag: 2
 [X] Got b'{"routing-key": "queue1", "keyword": "data 1", "domain": "data 1", "created": 1567068725, "expire": 47274000}'
 [X] Received data 1 (try: 2)
[!] Rejected. Going to sleep for a while...
delivery_tag: 3
 [X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
 [X] Received data 3 (try: 1)
[!] Rejected. Going to sleep for a while...
delivery_tag: 4
 [X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
 [X] Received data 3 (try: 2)
[!] Rejected. Going to sleep for a while...
delivery_tag: 5
 [X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
 [X] Received data 3 (try: 3)
 [!] data 3 Rejected after 3 retries
 [*] Sent to error queue: queue1.error
delivery_tag: 6
 [X] Got b'{"routing-key": "queue1", "keyword": "data 3", "domain": "data 3", "created": 1567068725, "expire": 47274000}'
 [X] Received data 3 (try: 3)
 [!] data 3 Rejected after 3 retries
 [*] Sent to error queue: queue1.error

问题:

  1. 我的代码当前实现是否符合我所需的拓扑?
  2. 直接与主题:在这种情况下,直接交换是最优化/最有效的解决方案吗?
  3. 1 个交易所与 2 个交易所:是否建议坚持 2 个交易所,或者是否可以简化为仅用 1 个交易所来处理所有内容?
  4. 如何测试不正常的消息,即发送到回调函数的重试循环部分的消息?回调当前不处理“正常”消息(即没有触发重试或只是失败的消息)。
  5. 两个交易所有必要绑定吗?注释掉这段代码没有什么区别。
  6. 我需要实现参数(channel.queue_declare) 到死信和非死信队列?我知道非死信队列将声明参数,从而x-dead-letter-exchange已设置,但我不确定是否x-dead-letter-routing-key也应该设置。
  7. 我需要ack / nack每次发布消息时,因为我注意到实现和未实现时的行为不同(即,不发送 ACK,FAIL 消息不会发送 3 次,而只会发送两次,而 ack 则发送超过 3 次)
  8. 在上面的输出中,“数据1”只被消耗了两次,“数据2”根本没有出现,“数据3”到达了MAX_RETRY限制为 3 次,但随后被发送到 *.error 队列两次(不是一次),我觉得很奇怪。 RMQ在这里做什么?

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ 中的消息序列出现意外行为 的相关文章

随机推荐