RabbitMQ 消息可靠性投递+消费

2023-11-13

RabbitMQ 消息可靠性投递+消费

任何消息中间件发消息投递的可靠性都是开发者选择的重要参考依据。我们希望的是发送的每一条消息都是可以被消费者正确处理的。但是没有哪个消息中间件可以保证消息一定 100% 投递成功,那么如果消息投递失败我们该如何处理呢?

本文消息靠性介绍比较浅,深入可参考:
消息可靠投递(上):https://www.cnblogs.com/mfrank/p/11380102.html
消息可靠投递(下):https://www.cnblogs.com/mfrank/p/11442706.html
https://www.cnblogs.com/ybyn/category/1849978.html

00、消息可靠性理论分析

20210418110834

1、RabbitMQ 消息投递路径

生产者(Producer) ==> 交换机(Exchange) ==> 队列(Queue) ==> 消费者(Consumer);这过程中消息可能丢失的三种情况:

1、生产者投递-丢失消息:Produer 发送消息到 Broker (Exchange) 失败,导致消息发送失败

2、生产者投递-丢失消息:Exchange 投递消息到 Queue 失败,导致消息丢

3、消费者消费-丢失消息:Consumer 从 Queue 中获取消息,但无法正确处理(消费)导致消息丢失

2、RabbitMQ 消息投递解决方案

1、消息事务机制:RabbitMQ是支持AMQP事务机制的,在生产者确认机制之前,事务是确保消息被成功投递的唯一方法。

2、confirmCallback确认模式:处理生产者发送消息到Broker失败场景,生产者投递消息后,如果Broker收到消息后,会返回生产者一个ACK通知。生产者通过ACK可以确定这条消息是否正常发送到Broker

3、returnCallback退回模式:默认情况下交换机投递消息到队列失败是直接丢弃该消息,开启 returnCallback后,如果消息投递失败会通知消息生产者

4、消息确认机制ACK:消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。消费者可以确认消费该消息或者消费失败并放入队列中等待下次继续消费,或者直接拒绝消费该消息

**建议:**开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制。

01、消息事务机制(不推荐)

事务机制能够解决生产者与broker之间消息确认的问题,只有消息成功被broker接受,事务才能提交成功,否则就进行事务回滚操作并进行消息重发。但是使用事务机制会降低RabbitMQ的消息吞吐量,不适用于需要发布大量消息的业务场景。**注意:事务是同步的,发送消息之后,等到接收到确认返回之后,才能发送下一条消息。**事务的实现主要是对信道(channel)的设置,主要的方法有三个:

  1. channel.txSelect():声明启动事务模式;
  2. channel.txComment():提交事务;
  3. channel.txRollback():回滚事务;

完整代码:

    try {
        channel.txSelect();
        channel.basicPublish(exchange, routingKey, null, msg.getBytes());
        int result = 1 / 0;
        channel.txCommit();
    } catch (Exception e) {
        channel.txRollback();
    }

RabbitMQ中事务的多了四个步骤:

client发送Tx.Select
broker发送Tx.Select-Ok(之后publish)
client发送Tx.Commit
broker发送Tx.Commit-Ok

如果发生异常事务就会滚,解决了在未到达交换机之前就出现错误,只有消息被成功接收事务才能提交成功否则我们便可以在捕捉异常后进行回滚重发,但是使用事务机制的话会降低rabbit的性能所有mq提供了一个更好的方案!

SpringBoot Config 类配置

package com.example.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TransactionRabbitMQConfig {
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);// 开启事务
        return rabbitTemplate;
    }
}

测试

    /**
     * 消息可靠性投递:事务机制
     * 如果返回异常则已经发送的消息会回滚
     */
    @Test
    @Transactional(transactionManager = "rabbitTransactionManager")
    public void transactionTest(){
        rabbitTemplate.convertAndSend("simple.queue", "test transaction");
        int nextInt = new Random().nextInt(2);
        System.out.println(nextInt);
        int i = 1/nextInt;// 模拟异常
    }

查看simple.queue的情况可以发现,如果发生异常,该消息是不会发送成功。可以尝试注释@Transactional,那么每次发送都会进入到simple.queue队列,不管后面是否发生异常。

02、消息确认模式(消息发送方)

02-1、JAVA 实现

1.普通Confirm方式

package com.example.confirm_return;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author:
 * @description: Producer 简单队列生产者
 */
public class ProducerConfirm1 {
    public static void main(String[] args) throws Exception {

        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 从连接工厂中获取
        Connection connection = connectionFactory.newConnection("生产者");
        // 3: 从连接中打开通道channel
        Channel channel = connection.createChannel();

        // 4: 开启 confirm
        channel.confirmSelect();

        // 5: 创建交换机
         channel.exchangeDeclare("confirm-exchange", BuiltinExchangeType.DIRECT);

        // 6: 尝试发送给一个存在的exchange和一个不存在的exchange,confirm-exchange、confirm-exchange-no
        channel.basicPublish("confirm-exchange-no", "", null, "你好,消息队列!".getBytes());
        // 7: 查询是否发送成功,失败会怕抛异常
        if(channel.waitForConfirms()){
            System.out.println("生产者发布消息成功!");
        }
        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

2.批量Confirm方式

package com.example.confirm_return;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author:
 * @description: Producer 简单队列生产者
 */
public class ProducerConfirm2 {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 从连接工厂中获取
        Connection connection = connectionFactory.newConnection("生产者");
        // 3: 从连接中打开通道channel
        Channel channel = connection.createChannel();

        // 4: 开启 confirm
        channel.confirmSelect();

        // 5: 创建交换机
        channel.exchangeDeclare("confirm-exchange", BuiltinExchangeType.DIRECT);

        for (int i = 0; i < 10; i++) {
            // 6: 尝试发送给一个存在的exchange和一个不存在的exchange,confirm-exchange、confirm-exchange-no
            channel.basicPublish("confirm-exchange", "", null, ("你好,消息队列!"+i).getBytes());
        }
        // 7: 当你发送的全部消息,有一个失败时,就直接全部失败 并抛出IOException
        channel.waitForConfirmsOrDie();
        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

3.异步Confirm方式

package com.example.confirm_return;

import com.rabbitmq.client.*;
import java.util.concurrent.TimeUnit;

/**
 * @author:
 * @description: Producer 简单队列生产者
 */
public class ProducerConfirm3 {
    public static void main(String[] args) throws Exception {
        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 从连接工厂中获取
        Connection connection = connectionFactory.newConnection("生产者");
        // 3: 从连接中打开通道channel
        Channel channel = connection.createChannel();

        // 4: 生产者调用confirmSelect 将 channel 设置为 confirm 模式
        channel.confirmSelect();
        // 5: 开启异步回调,查询是否发送成功,失败会怕抛异常
        channel.addConfirmListener(new ConfirmListener() {
            // 成功回调
            @Override
            public void handleAck(long deliveryTag, boolean multiple) {
                System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
            }
            // 失败回调
            @Override
            public void handleNack(long deliveryTag, boolean multiple) {
                System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
            }
        });

        // 6: 创建交换机
        channel.exchangeDeclare("confirm-exchange", BuiltinExchangeType.DIRECT);

        for (int i = 0; i < 10; i++) {
            // 7: 尝试发送给一个存在的exchange和一个不存在的exchange,confirm-exchange、confirm-exchange-no
            channel.basicPublish("confirm-exchange", "", null, ("你好,消息队列!"+i).getBytes());
        }

        // 让主线程休眠,可以每次都看到控制台输出
        TimeUnit.MILLISECONDS.sleep(1000);

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

02-2、SpringBoot 实现

application.properties:开启 ConfirmCallback 配置

# NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法,还能选择simple
spring.rabbitmq.publisher-confirm-type=correlated

生产者配置代码:

/**
 * 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。
 * 生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
 * yaml文件中添加配置 spring.rabbitmq.publisher-confirm-type=correlated
 */
@PostConstruct
public void setConfirmCallback() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * @param correlationData 发送消息时指定的唯一关联数据(消息id)
         * @param ack             投递结果
         * @param cause           失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                log.info("消息投递到交换机成功:[correlationData={}]", correlationData);
            } else {
                log.error("消息投递到交换机失败:[correlationData={},原因={}]", correlationData, cause);
            }
        }
    });
}

生产者发送测试:

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void confirmTest(){
        // 消息内容
        Map<String,String> map = new HashMap<>();
        map.put("message","testing confirm function");
        // 设置自定义反馈消息
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());
        // 发送到并不存在名为“exchange-not”的exchange
        rabbitTemplate.convertAndSend("exchange-not","",map,correlationData);
    }

控制台输出:

2021-05-28 13:35:24.532 ERROR 9028 --- [nectionFactory2] c.e.c.ConfirmAndReturnCallbackConfig     : 
消息投递到交换机失败:[correlationData=CorrelationData [id=13e952a8-98e5-4eb2-831e-ffb553c48cea],原因=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange-dog' in vhost '/', class-id=60, method-id=40)]

03、消息回退模式(消息发送方)

Confirm只能保证消息到达 exchange,无法保证消息可以被 exchange分发到指定 queue。而且 exchange是不能持久化消息的,queue是可以持久化消息。采用 return 机制来监听消息是否从 exchange送到了指定的 queue中

20210418110849

当把 mandotory 参数设置为 true 时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。

03-1、JAVA + SpringBoot 实现

Java 示例代码:

package com.example.confirm_return;

import com.rabbitmq.client.*;
import java.io.UnsupportedEncodingException;

/**
 * @author:
 * @description: Producer 简单队列生产者
 */
public class ProducerReturn {
    public static void main(String[] args) throws Exception {

        // 1: 创建连接工厂,设置连接属性
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 从连接工厂中获取
        Connection connection = connectionFactory.newConnection("生产者");
        // 3: 从连接中打开通道channel
        Channel channel = connection.createChannel();

        // 开启return机制,并在发送消息时,指定mandatory为true
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties,
                                     byte[] body) throws UnsupportedEncodingException {
                // 当消息没有送达到queue时,才会执行。
                System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
            }
        });

        // 6: 参数3为mandatory:开启return,发送到一个不存在queue
        channel.basicPublish("", "xxx", true, null, "你好,消息队列!".getBytes());

        // 最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

SpringBoot 示例代码:

1、修改application.properties:开启 returnCallback 配置

# 可以确保消息在未被队列接收时返回,如何配置了mandatory=true,可以不用配置publisher-returns
# spring.rabbitmq.publisher-returns=true

# 指定消息在没有被队列接收时是否强行退回还是直接丢弃。true退回,false丢弃.
# 或者代码rabbitTemplate.setMandatory(true);也可以
spring.rabbitmq.template.mandatory=true

注意:publisher-returns 与 mandatory 配置的区别:

spring.rabbitmq.template.mandatory属性的优先级高于spring.rabbitmq.publisher-returns的优先级
spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true,
spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值
spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定

2、示例代码:

/**
 * 注意下面两项必须同时配置:
 * spring.rabbitmq.publisher-returns=true
 * spring.rabbitmq.template.mandatory=true
 */
@PostConstruct
public void setQueueCallback() {
    // rabbitTemplate.setMandatory(true); // 配置文件mandatory和代码配置其一即可
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        /**
         * @param returnedMessage 所有返回的信息都在里面
         */
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]",
                    returnedMessage.getMessage(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey(),
                    returnedMessage.getReplyCode(),
                    returnedMessage.getReplyText());
        }
    });
}

3、测试代码:

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void returnTest(){
        // 并不存在名为“dog”的routingKey,即投不到现有的queue里
        rabbitTemplate.convertAndSend("not_exist_queue","")
    }

4、控制台输出:

2021-05-28 13:50:02.004 ERROR 8692 --- [nectionFactory1] c.e.c.ConfirmAndReturnCallbackConfig     : 路由到队列失败,[消息内容:(Body:'' MessageProperties [headers=(Body:'' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),交换机:,路由件:not_exist_queue,回复码:312,回复文本:NO_ROUTE]

可以看到,我们接收到了被退回的消息,并带上了消息被退回的原因:NO_ROUTE。但是要注意的是, mandatory 参数仅仅是在当消息无法被路由的时候,让生产者可以感知到这一点,只要开启了生产者确认机制,无论是否设置了 mandatory 参数,都会在交换机接收到消息时进行消息确认回调,而且通常消息的退回回调会在消息的确认回调之前。

03-2、备用交换机开启

有了 mandatory 参数,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。

而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

不要慌,在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

听的不太明白?没关系,看个图就知道是怎么回事了。

20210418110850

接下来,我们就来设置一下备份交换机(建议生产者和消费者端同时定义):

package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BackupRabbitMQConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "backup.business-exchange";
    public static final String BUSINESS_QUEUE_NAME = "backup.business-queue";
    public static final String BACKUP_EXCHANGE_NAME = "backup.backup-exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.backup-queue";
    public static final String BACKUP_WARNING_QUEUE_NAME = "backup.backup-warning-queue";

    // 声明业务 Exchange
    @Bean("businessExchange")
    public DirectExchange businessExchange(){
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME)
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)
                .durable(true);

        return (DirectExchange)exchangeBuilder.build();
    }

    // 声明业务队列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();
    }

    // 声明业务队列绑定关系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key");
    }

    // 声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE_NAME)
                .durable(true);
        return (FanoutExchange)exchangeBuilder.build();
    }

    // 声明备份队列
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    // 声明备份队列绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backupQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 声明报警队列
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(BACKUP_WARNING_QUEUE_NAME).build();
    }

    // 声明备份报警队列绑定关系
    @Bean
    public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue,
                                        @Qualifier("backupExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

}

这里我们使用 ExchangeBuilder 来创建交换机,并为其设置备份交换机:

.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);

为业务交换机绑定了一个队列,为备份交换机绑定了两个队列,一个用来存储不可投递消息,待之后人工处理,一个专门用来做报警用途。

接下来,分别为业务交换机和备份交换机创建消费者:

package com.example.service;

import com.example.config.BackupRabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Slf4j
@Component
public class BackupConsumer {
    /**
     * 正常可以消费的业务队列
     */
    @RabbitListener(queues = BackupRabbitMQConfig.BUSINESS_QUEUE_NAME)
    public void receiveMsgA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("业务队列-收到业务消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }

    /**
     * 备份警告队列:
     * 由于是 Fanout 模式。所以备份队列和警报队列都会收到消息。
     * 这里只监听了备份交换机中的警报队列,没有监听备份队列。
     * 可以在Web端查看到 backup.backup-queue 队列会有消息没有消费
     */
    @RabbitListener(queues = BackupRabbitMQConfig.BACKUP_WARNING_QUEUE_NAME)
    public void receiveMsgB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.error("备份警告队列-发现不可路由消息:{}", msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

接下来我们在发送端代码中分别发送一条可用路由消息和不可用路由消息 测试:

    /**
     * 消息回退模式 + 备份交换机模式
     * 分别发送一条可路由消息和不可路由消息:
     */
    @Test
    public void backupTest(){
        // 发送可用路由
        log.info("消息id:{}, msg:{}", "1", "发送到存在的队列上");
        rabbitTemplate.convertAndSend(BackupRabbitMQConfig.BUSINESS_EXCHANGE_NAME,
                "key", "发送到存在的队列上", new CorrelationData("1"));

        // 发送不可用用路由,这样会直接转发到备用交换机中去(然后分发到指定的路由中)
        log.info("消息id:{}, msg:{}", "2", "发送到不存在的队列上");
        rabbitTemplate.convertAndSend(BackupRabbitMQConfig.BUSINESS_EXCHANGE_NAME,
                "key2","发送到不存在的队列上",new CorrelationData("2"));
    }

生产者查看控制台输出:

2021-06-02 11:39:44.691  INFO 24260 --- [main] : 消息id:1, msg:发送到存在的队列上
2021-06-02 11:39:44.911  INFO 24260 --- [main] : 消息id:2, msg:发送到不存在的队列上

消费者查看控制台输出:

2021-06-02 11:48:10.755  INFO 3452 --- [tContainer#11-1] : 业务队列-收到业务消息:发送到存在的队列上
2021-06-02 11:48:10.755 ERROR 3452 --- [tContainer#12-1] : 备份警告队列-发现不可路由消息:发送到不存在的队列上

这里仅仅使用 error 日志配合日志系统进行报警,如果是敏感数据,可以使用邮件、钉钉、短信、电话等报警方式来提高时效性。

回退模式(开启mandatory参数)与 备用交换机同时使用:

设置 mandatory 参数会让交换机将不可路由消息退回给生产者,而备份交换机会让交换机将不可路由消息转发给它,那么如果两者同时开启,两条消息都可以收到确认成功回调,但是不可用路由消息不会被回退给生产者,而是直接转发给备份交换机。可见备份交换机的处理优先级更高。

04、消息确认机制 ACK(消息接收方)

消息确认机制ACK介绍:

  • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
  • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
  • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked

AMQP 0-9-1 有两种消息 ACK 模式:

  • 自动 ACK 模式
  • 手动 ACK 模式

RabbitMQ中消费者有3种手动确认消息方式:

  • basicAck:表示成功确认,使用此回执方法后,消息会被broker 删除。

  • basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

  • basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

开启手动确认消费配置 + 代码实战

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。Java的话直接使用使用channel操作即可。 SpringBoot的话直接在@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

# 开启手动确认消息,如果消息重新入队,可以配置是否需要重试:retry.enabled=true
spring.rabbitmq.listener.simple.acknowledge-mode: manual
package com.example.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class AckConsumer {
    @RabbitListener(queues = "ack.queue")
    public void processOne(String msg, Message message, Channel channel) throws IOException {
        // 该条消息的消息编号,Long类型,递增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            /**
             * 处理成功 手动ACK回执
             * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。
             *              手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
             * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
             *           假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,
             *           当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认
             */
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 当消息处理异常时,将消息重新放回队列,重新排队。如下两种选择其一即可
            /**
             * deliveryTag:消息编号 递增的
             * multiple:是否批量处理
             * requeue:被拒绝的是否重新入队列. true:让消息重新回到队列,false:直接丢弃
             */
            channel.basicNack(deliveryTag, false, false);
            /**
             * deliveryTag:消息编号 递增的
             * requeue:被拒绝的是否重新入队列. true:让消息重新回到队列,false:直接丢弃
             */
            channel.basicReject(deliveryTag,false);
        }
    }
}

05、消息重试机制(消息接收方)

参考:https://www.cnblogs.com/ybyn/p/13691058.html

# 消费者自动ack
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 开启支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数是5次(包含自身消费的一次)
spring.rabbitmq.listener.simple.retry.max-attempts=5
# 重试最大间隔时间
spring.rabbitmq.listener.simple.retry.max-interval=10000
# 重试初始间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=2000
# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
spring.rabbitmq.listener.simple.retry.multiplier=2

可以看到重试次数是5次(包含自身消费的一次),重试时间依次是2s,4s,8s,10s(上一次间隔时间*间隔时间乘子),最后一次重试时间理论上是16s,但是由于设置了最大间隔时间是10s,因此最后一次间隔时间只能是10s,和配置相符合。

注意:

重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系;

因此上述消费者代码不能添加try{}catch(){},一旦捕获了异常,在自动ack模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的;

TODO:还可以继续研究

06、消息限流策略(消息接收方)

1:为什么要限流:若队列中消息积压过多,突然开启监听,会导致消费端崩溃。

2:如何限流:使用RabbitMQ提供的Qos(服务质量保证)功能,如果一定数目消息的未被应答前,不再接受新消息。

3:代码测试:需要开启手动应答模式

# 开启消费者手动消息应答
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.example.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Slf4j
@Component
public class QosConsumer {
    @RabbitListener(queues = "qos-queue")
    public void revice(String msg, Message message, Channel channel) throws IOException {
        try {
            log.info("消息ID:" + message.getMessageProperties().getHeader("spring_returned_message_correlation"));
            log.info("消息标签:" + message.getMessageProperties().getDeliveryTag());
            /**
             * 设置Qos机制:
             * 参数 1 prefetchSize:单条消息的大小(0表示即无限制)
             * 参数 2 prefetchCount:每次处理消息的数量
             * 参数 3 global:是否为consumer级别(false表示仅当前channel有效)
             */
            channel.basicQos(0, 1, false);
            //手动应答消息  第一个参数是所确认消息的标识,第二参数是是否批量确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            log.error("消息ID:" + message.getMessageProperties().getHeader("spring_returned_message_correlation"));
            log.error("接收消息发送错误:" + e.getMessage());
        }
    }
}

XX、消息可靠性踩坑日志

参考:https://juejin.cn/post/6844904205438681095

01、消息不确认

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

02、消息无限投递

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        try {
            log.info("消费者 1 号收到:{}", msg);
            int a = 1 / 0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

TODO:还可继续深入

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ 消息可靠性投递+消费 的相关文章

  • 按下按钮并在java中的新窗口中打开文件

    我创建了一个 JFrame 并放置了一个文本字段和按钮 在文本字段中我放置了从文本文件读取的名称 我知道我想单击按钮并打开一个已知窗口 我想在其中放置名称 其他信息来自同一个文件 这是我的代码 这是我的主框架 package Fronten
  • 如何在由子控件组成的 SWT 复合材料上跟踪鼠标?

    我创建了自己的控件 我想跟踪鼠标并添加一个MouseTrackListener 很遗憾MouseEnter and MouseLeave当鼠标移动到我的合成部分 即标签和按钮 上时 也会生成事件 Mouse enter mouse ente
  • Java Logger 未记录到 Netbeans 中的输出

    我正在 Netbeans 中使用 Maven 启动一个 Java 项目 我编写了一些代码来使用 Logger 类进行日志记录 但是 日志记录似乎不起作用 在程序开始时 我运行 Logger getLogger ProjectMainClas
  • 两个整数乘积的模

    我必须找到c c a b mod m a b c m 是 32 位整数 但 a b 可以超过 32 位 我正在尝试找出一种计算 c 的方法 而不使用 long 或任何 gt 32 位的数据类型 有任何想法吗 如果m是质数 事情可以简化吗 注
  • 垃圾收集器如何在幕后工作来收集死对象?

    我正在阅读有关垃圾收集的内容 众所周知 垃圾收集会收集死亡对象并回收内存 我的问题是 Collector 如何知道任何对象已死亡 它使用什么数据结构来跟踪活动对象 我正在研究这个问题 我发现GC实际上会跟踪活动对象 并标记它们 每个未标记的
  • 提供节点名或服务名,或未知 Java

    最近我尝试运行我的 Java 项目 每当我运行它并将其打开到我得到的服务器地址时 Unable to determine host name java net UnknownHostException Caused by java net
  • 在 Java 中如何找出哪个对象打开了文件?

    我需要找出答案哪个对象在我的 Java 应用程序中打开了一个文件 这是为了调试 因此欢迎使用工具或实用程序 如果发现哪个对象太具体了 这class也会很有帮助 这可能很棘手 您可以从使用分析器开始 例如VisualVM http visua
  • Java 中如何将 char 转换为 int? [复制]

    这个问题在这里已经有答案了 我是Java编程新手 我有例如 char x 9 我需要得到撇号中的数字 即数字 9 本身 我尝试执行以下操作 char x 9 int y int x 但没有成功 那么我应该怎么做才能得到撇号中的数字呢 ASC
  • Sun 在 EDT 之外做 GUI 工作的演示?

    我正在看SplashDemo java http download oracle com javase tutorial uiswing examples misc SplashDemoProject src misc SplashDemo
  • 如何在 ant 中为 junit 测试设置 file.encoding?

    我还没有完全完成file encoding 和 ant https stackoverflow com questions 1339352 how do i set dfile encoding within ants build xml
  • 提高 PostgreSQL 1 亿数据左连接查询性能

    我在用Postgresql 9 2 version Windows 7 64 bit RAM 6GB 这是一个Java企业项目 我必须在我的页面中显示订单相关信息 有三个表通过左连接连接在一起 Tables TV HD 389772 行 T
  • 在 Spring 上下文中查找方法级自定义注释

    我想知道的是 所有的类 方法Spring http en wikipedia org wiki Spring Framework注释为 Versioned的bean 我创建了自定义注释 Target ElementType METHOD E
  • Hibernate 本机查询 - char(3) 列

    我在 Oracle 中有一个表 其中列 SC CUR CODE 是 CHAR 3 当我做 Query q2 em createNativeQuery select sc cur code sc amount from sector cost
  • 列表过滤器内的 Java 8 lambda 列表

    示例 JSON id 1 products id 333 status Active id 222 status Inactive id 111 status Active id 2 products id 6 status Active
  • Android View Canvas onDraw 未执行

    我目前正在开发一个自定义视图 它在画布上绘制一些图块 这些图块是从多个文件加载的 并将在需要时加载 它们将由 AsyncTask 加载 如果它们已经加载 它们只会被绘制在画布上 这工作正常 如果加载了这些图片 AsyncTask 就会触发v
  • Java/Python 中的快速 IPC/Socket 通信

    我的应用程序中需要两个进程 Java 和 Python 进行通信 我注意到套接字通信占用了 93 的运行时间 为什么通讯这么慢 我应该寻找套接字通信的替代方案还是可以使其更快 更新 我发现了一个简单的修复方法 由于某些未知原因 缓冲输出流似
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • 由 Servlet 容器提供服务的 WebSocket

    上周我研究了 WebSockets 并对如何使用 Java Servlet API 实现服务器端进行了一些思考 我没有花费太多时间 但在使用 Tomcat 进行一些测试时遇到了以下问题 如果不修补容器或至少对 HttpServletResp
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa
  • Java 和/C++ 在多线程方面的差异

    我读过一些提示 多线程实现很大程度上取决于您正在使用的目标操作系统 操作系统最终提供了多线程能力 比如Linux有POSIX标准实现 而windows32有另一种方式 但我想知道编程语言水平的主要不同 C似乎为同步提供了更多选择 例如互斥锁

随机推荐

  • Linux 反引号、单引号、双引号简析

    文章目录 一 引号的作用 二 反引号 三 单引号 四 双引号 五 引号解释顺序 一 引号的作用 1 将多个因为空格或者回车等分隔符隔开的字符串合在一起 避免被命令行解析分开 例如 a b c 是一个字符串 而不会像 a b c 这样会被解析
  • 6.基于STM32C8T6的四旋翼无人机的飞控制作----实践操作1,AD电路板绘制-前期准备

    飞控的各种芯片是焊接在电路板上的 电路板是各种传感器和接口的载体 电路板的设计过程也是极其考验耐心和仔细程度的 在大公司里都是每个部门负责其中的一部分工作 如原理图设计 布局布线 测试等等 在小公司和大学生的设计中 经常需要一个人走完全部的
  • 【Linux杂学】CMake:编写 CMakeLists

    CMake官网 掌握CMake 1 变量 CMake 变量名称区分大小写 且只能包含字母数字字符和下划线 CMAKE 变量 存储均为字符串 是CMake 自定义变量 命名应避开 set 可设变量值 第一参数是变量名 其余参数是值 多个参数被
  • Kylin 大数据下的OLAP解决方案和行业典型应用

    最近工作中应用到了 Kylin 因此调研了 Kylin的原理和行业应用 本文参考了官网和众多其他公司中 Kylin的应用案例 文末给出了出处 希望对大家有帮助 Apache Kylin的原理和技术架构 Apache Kylin 从数据仓库中
  • 火狐浏览器关闭百度热榜(屏蔽网站特定元素)

    2023 1 26更新 火狐浏览器不能下载 uBlock Origin 解决方法 https github com AdguardTeam AdguardBrowserExtension releases How to install be
  • 一、voc数据集按比例划分train、val

    下载的voc数据集 images annotations 经下代码 按比例划分成 只需要修改中文批注部分路 import os import sys import random import shutil if name main trai
  • Geforce 错误代码 ERROR CODE:0x0003问题方法

    笔者在360驱动大师安装了geforce驱动 打开geforce遇到报错0x0003 重启和重装都无效 解决办法是到nvidia官网重新安装了官网的geforce驱动 然后就能打开了
  • ARP协议

    一 ARP协议的简介 1 在网络通讯时 源主机的应用程序只知道目的主机的IP地址和端口号 却不知道目的主机的硬件地址 而数据包首先是被网卡接收到再去处理上层协议的 如果接收到的数据包的硬件地址与本机不符 则直接丢弃 因此在通讯前必须获得目的
  • pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn pym

    这里写自定义目录标题 欢迎使用Markdown编辑器 新的改变 功能快捷键 合理的创建标题 有助于目录的生成 如何改变文本的样式 插入链接与图片 如何插入一段漂亮的代码片 生成一个适合你的列表 创建一个表格 设定内容居中 居左 居右 Sma
  • HCIP——OSPF知识点

    目录 一 OSPF协议的简介 二 OSPF的五种数据包 三 OSPF协议的7种状态机 四 OSPF 的工作过程 五 OSPF的基础配置 六 扩展配置 七 OSPF的LSA 八 OSPF的不规则区域 一 OSPF协议的简介 Ospf 开放式最
  • 3D渲染速度慢,花重金买显卡还是用云渲染更划算

    3D渲染对建筑师和设计师来说并不陌生 3D渲染的过程中出现渲染卡顿 特殊材质难以渲染 或者本地配置不足 本地渲染资源不够时 常常会影响工作效率 本文比较了3D渲染时 为提高工作效率 买显卡还是用云渲染更划算 希望对大家有帮助 3D渲染速度慢
  • 电脑麦克风输入没声音,如何解决

    文章目录 一 麦克风输入没声音的原因 二 解决办法 1 打开麦克风隐私权限 2 设置更换输入设备 3 打开麦克风设置 4 更新声卡驱动 重启电脑 5 设备损坏 更换设备 一 麦克风输入没声音的原因 麦克风没声音 麦克风设置问题或硬件损坏问题
  • 【数据集】浙大动态人类3d数据集LightStage

    LightStage LightStage是一个多视图数据集 在NeuralBody中提出 该数据集使用具有 20 同步摄像头的多摄像头系统捕获多个动态人类视频 人类执行复杂的动作 包括旋转 太极 手臂摆动 热身 拳击和踢腿 我们提供使用E
  • innodb存储引擎

    文章目录 1 innodb存储引擎概述 2 innodb体系架构 2 1后台线程 2 2内存 1 缓冲池 2 LRU list 和 Flush list 和Free list 3 重做日志缓冲 4 额外的内存池 2 4Checkpoint技
  • 在AIX4.3.3 ; AIX5.1 和 AIX5.2上安装OpenSSH

    在AIX4 3 3 AIX5 1 和 AIX5 2上安装OpenSSH 在AIX4 3 3 AIX5 1 和 AIX5 2上安装OpenSSH 一 在IBM AIX4 3 3 上安装OpenSSH At 4 3 3 the openSSH
  • 一战上岸北京211 初试+复试 408错题笔记

    趁着现在还记着点复试的内容我先把复试的内容捋一遍 先是政治问题 都是大概意思 假如导师给你分配的事情比较多 你心情会发生什么样的变化 怎么看待近两年中国的抗疫历程 我国把人民生命健康放在第一位说明了什么 怎么看待网络打赏行为 应该还有一个问
  • 树莓派Ubuntu20.04安装ros系统

    第一位大佬的博文 第二位大佬的博文 首先设置软件源 这里可以是官方源也可以是镜像 由于我官方源就成功了 所以没用镜像源 sudo sh c echo deb http packages ros org ros ubuntu lsb rele
  • AI实战训练营(Class 5)MMPretrain代码实战

    AI实战训练营 Class 5 MMPretrain代码实战 1 安装MMPretrain 首先安装openmim工具 从源码安装mmpretrain 通过下面的命令安装多模态版本的 mmpretrain 2 熟悉MMPretrain 猫狗
  • Traceback (most recent call last): File "", line 1, in ImportError: No module named

    在学习python的过程中会遇到如下错误 gt gt gt import mytest Traceback most recent call last File
  • RabbitMQ 消息可靠性投递+消费

    RabbitMQ 消息可靠性投递 消费 任何消息中间件发消息投递的可靠性都是开发者选择的重要参考依据 我们希望的是发送的每一条消息都是可以被消费者正确处理的 但是没有哪个消息中间件可以保证消息一定 100 投递成功 那么如果消息投递失败我们