RabbitMQ消息队列,发送消息失败、消息持久化、消费者失败处理方法和发送消息

2023-05-16

项目是使用springboot项目开发的,前是代码实现,后面有分析发送消息失败、消息持久化、消费者失败处理方法和发送消息解决方法及手动确认的模式

先引入pom.xml


<!--rabbitmq-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>  

application 配置文件


spring:
rabbitmq:
  host: IP地址
  port: 5672
  username: 用户名
  password: 密码

RabbitConfig配置文件
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


/**
 Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
 Queue:消息的载体,每个消息都会被投到一个或多个队列。
 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 Producer:消息生产者,就是投递消息的程序.
 Consumer:消息消费者,就是接受消息的程序.
 Channel:消息通道,在客户端的每个连接里,可建立多个channel.
*/
@Configuration
@Slf4j
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    public static final String EXCHANGE_A = "my_mq_exchange_A";
    public static final String EXCHANGE_B = "my_mq_exchange_B";
    public static final String EXCHANGE_C = "my_mq_exchange_C";

    public static final String QUEUE_A="QUEUE_A";
    public static final String QUEUE_B="QUEUE_B";
    public static final String QUEUE_C="QUEUE_C";


    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); //设置发送消息失败重试
        connectionFactory.setChannelCacheSize(100);//解决多线程发送消息

        return connectionFactory;
    }
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMandatory(true); //设置发送消息失败重试
        return template;

    }
    //配置使用json转递数据
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /*public SimpleMessageListenerContainer messageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());
        return container;
    }*/

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无 routingkey的概念
     * HeadersExchange: 通过添加属性key - value匹配
     * DirectExchange: 按照routingkey分发到指定队列
     * TopicExchange : 多关键字匹配
     * @return
     */
    @Bean
    public DirectExchange defaultExchange(){
        return new DirectExchange(EXCHANGE_A,true,false);
    }

    @Bean
    public Queue queueA(){
        return  new Queue(QUEUE_A,true);// 队列持久化
    }

    @Bean
    public Queue queueB(){
        return  new Queue(QUEUE_B,true);// 队列持久化
    }

    /**
     * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

}  

生成者


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * 生产者
 */
@Component
@Slf4j
public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public ProducerMessage(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
        rabbitTemplate.setReturnCallback(this::returnedMessage);
        rabbitTemplate.setMandatory(true);
    }

    public void  sendMsg (Object content){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);

    }

    /**
     * 消息发送到队列中,进行消息确认
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(" 消息确认的id: " + correlationData);
        if(ack){
            log.info("消息发送成功");
            //发送成功 删除本地数据库存的消息
        }else{
            log.info("消息发送失败:id "+ correlationData +"消息发送失败的原因"+ cause);
            // 根据本地消息的状态为失败,可以用定时任务去处理数据

        }
    }

    /**
     * 消息发送失败返回监控
     * @param message
     * @param i
     * @param s
     * @param s1
     * @param s2
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.info("returnedMessage [消息从交换机到队列失败]  message:"+message);

    }
}  

消费者


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 消费者
 */

@Slf4j
@Component

public class ComsumerMessage {

    @RabbitListener(queues = RabbitConfig.QUEUE_A)
    public void handleMessage(Message message,Channel channel) throws  IOException{
        try {
            String json = new String(message.getBody());
            JSONObject jsonObject = JSONObject.fromObject(json);
            log.info("消息了【】handleMessage" +  json);
            int i = 1/0;
            //业务处理。
            /**
             * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据中是否有数据
             * 1、有数据则不消费,直接应答处理
             * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
             * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
             */

            //手动应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            log.error("消费消息失败了【】error:"+ message.getBody());
            log.error("OrderConsumer  handleMessage {} , error:",message,e);
            // 处理消息失败,将消息重新放回队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }

    }

}  

发送消息:调用生成的方法


import com.zz.blog.BlogApplicationTests;
import com.zz.blog.mq.ProducerMessage;
import net.sf.json.JSONObject;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
public class Message extends BlogApplicationTests {
    @Autowired
    private ProducerMessage producerMessage;

    @Test
    public void sendMessage(){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", UUID.randomUUID().toString());
        jsonObject.put("name","TEST");
        jsonObject.put("desc","订单已生成");
        //防止发送消息失败,将发送消息存入本地。

        producerMessage.sendMsg(jsonObject.toString());

    }
}
  

rabbitTemplate的发送消息流程是这样的:
1 发送数据并返回(不确认rabbitmq服务器已成功接收)
2 异步的接收从rabbitmq返回的ack确认信息
3 收到ack后调用confirmCallback函数
注意:在confirmCallback中是没有原message的,所以无法在这个函数中调用重发,confirmCallback只有一个通知的作用

在这种情况下,如果在2,3步中任何时候切断连接,我们都无法确认数据是否真的已经成功发送出去,从而造成数据丢失的问题。

最完美的解决方案只有1种:
使用rabbitmq的事务机制。
但是在这种情况下,rabbitmq的效率极低,每秒钟处理的message在几百条左右。实在不可取。

基于上面的分析,我们使用一种新的方式来做到数据的不丢失。
在rabbitTemplate异步确认的基础上
1 在本地缓存已发送的message
2 通过confirmCallback或者被确认的ack,将被确认的message从本地删除
3 定时扫描本地的message,如果大于一定时间未被确认,则重发

当然了,这种解决方式也有一定的问题
想象这种场景,rabbitmq接收到了消息,在发送ack确认时,网络断了,造成客户端没有收到ack,重发消息。(相比于丢失消息,重发消息要好解决的多,我们可以在consumer端做到幂等)。

消息存入本地:在message 发消息的写数据库中。

消息应答成功,则删除本地消息,失败更改消息状态,可以使用定时任务去处理。

消息持久化:

消费者: 


/**
 * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据
 * 1、有数据则不消费,直接应答处理
 * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
 * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
 */  

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ消息队列,发送消息失败、消息持久化、消费者失败处理方法和发送消息 的相关文章

  • 14.2 shell函数参数

    2 shell函数参数 2 1 位置参数2 2 选项参数2 2 1 getopts getopt的区别2 2 2 getopts的使用2 2 3 getopt的使用 Shell 函数参数的传递和其它编程语言不同 xff0c 没有所谓的形参和
  • protoc和protoc-gen-go-grpc安装及编译

    一 install protocol buffer compiler PB REL 61 34 https github com protocolbuffers protobuf releases 34 curl LO PB REL dow
  • powershell 脚本解压zip文件到指定目录

    span class token keyword Function span Unzip span class token operator span File span class token punctuation span span
  • 不用第三方软件 用DISM命令备份与还原win8系统

    分享一个来自远景论坛的的教程如何通过dism命令给自己的win8系统备份和如何通过dism命令还原系统 用 DISM 命令进行系统备份与还原不需要任何第三方软件 xff0c 是利用 Windows 7 Windows 8 系统自带的 DIS
  • ubuntu20.04+anaconda3+tensorflow-gpu2.1安装

    磁盘分区 WIN系统中 xff0c 右键我的电脑 管理 磁盘管理 xff0c 首先留给Ubuntu一定的空间 xff0c 这里为600G左右 Ubuntu系统盘制作 下载Ubuntu对应版本 xff0c 制作启动盘 Ubuntu安装 U盘启
  • nyist 27 水池数目(dfs搜索)

    xfeff xfeff 水池数目 时间限制 xff1a 3000 ms 内存限制 xff1a 65535 KB 难度 xff1a 4 描述 南阳理工学院校园里有一些小河和一些湖泊 xff0c 现在 xff0c 我们把它们通一看成水池 xff
  • XTUOJ 1176 I Love Military Chess(模拟)

    xfeff xfeff I Love Military Chess Accepted 45 Submit 141Time Limit 1000 MS Memory Limit 65536 KB 题目描述 陆军棋 xff0c 又称陆战棋 xf
  • 数据结构课程设计之一元多项式的计算

    数据结构不是听会的 xff0c 也不是看会的 xff0c 是练会的 xff0c 对于写这么长的代码还是心有余也力不足啊 xff0c 对于指针的一些操作 xff0c 也还是不熟练 xff0c 总出现一些异常错误 xff0c 对于数据结构掌握还
  • 数据结构课程设计之通讯录管理系统

    数据结构的第二个课程设计 xff0c 在c语言课程设计的基础上加以改进 xff0c xff08 加强版 xff09 xff0c 保存一下代码 xff0c 对文件的处理 xff0c 还是有一点一问题 xff0c 还有待改进 include l
  • 在网页中添加音乐

    最近在折腾一个网页 xff0c 对于一个有强迫症的人来说 xff0c 就想在网页中插入音乐 xff0c xff08 当做背景音乐 xff09 xff0c 然后自己百度了好多资料 xff1b 就在这里总结一下 xff1a 第一步 xff1a
  • nyist oj 214 单调递增子序列(二) (动态规划经典)

    单调递增子序列 二 时间限制 xff1a 1000 ms 内存限制 xff1a 65535 KB 难度 xff1a 4 描述 给定一整型数列 a1 a2 an xff08 0 lt n lt 61 100000 xff09 xff0c 找出
  • 思科CCNA第一学期期末考试答案

    1 第 3 层头部包含的哪一项信息可帮助数据传输 xff1f 端口号 设备物理地址 目的主机逻辑地址 虚拟连接标识符 2 IP 依靠 OSI 哪一层的协议来确定数据包是否已丢失并请求重传 xff1f 应用层 表示层 会话层 传输层 3 请参
  • 使用pprof分析在线服务cpu性能

    一 安装pprof go install github com google pprof 64 latest 执行后如果报下面错误 build github com google pprof cannot load embed malfor
  • hexo博客出现command not found解决方案

    由于前一段时间忙于考试 xff0c 也有好久没有去更新博客了 xff0c 今天去添加友链的时候 xff0c 突然发现用不了了 xff0c 出现了conmand not found的提示 xff1a 按照字面上的翻译就是 找不到所使用的命令
  • 思科CCNA第二学期期末考试答案

    1 关于数据包通过路由器传输时的封装和解封的叙述 xff0c 下列哪三项是正确的 xff1f xff08 选择三项 xff09 路由器修改 TTL 字段 xff0c 将其值减 1 路由器将源 IP 更改为送出接口的 IP 路由器保持相同的源
  • Hexo版本升级和Next主题升级之坑

    缘起 差不多用了一年hexo的3 2 0版本 xff0c next主题版本也用的5 0的 xff0c 本来用的好好的 xff0c 但是最近访问其他人的博客 xff0c 发现访问速度比我的提升了不止一点点 xff0c 遂决定折腾一番 过程 H
  • Python中JSON的基本使用

    JSON JavaScript Object Notation 是一种轻量级的数据交换格式 Python3 中可以使用 json 模块来对 JSON 数据进行编解码 xff0c 它主要提供了四个方法 xff1a dumps dump loa
  • 卷积和快速傅里叶变换(FFT)的实现

    卷积运算 卷积可以说是图像处理中最基本的操作 线性滤波通过不同的卷积核 xff0c 可以产生很多不同的效果 假如有一个要处理的二维图像 xff0c 通过二维的滤波矩阵 xff08 卷积核 xff09 xff0c 对于图像的每一个像素点 xf
  • 对数(log)的换算公式

    对数公式的换算 xff0c 对于算法复杂度的推导非常重要 但是我总忘 xff0c 这次特地总结一下常用的对数公式 xff0c 以备后用 名称公式和差 log
  • 从零开始手工实现卷积神经网络CNN以及img2col和col2img的实现方法

    本文主要实现卷积神经网络 xff08 Convolutional Neural Network CNN xff09 中卷积操作的forward和backward函数 CNN主要包括卷积 xff08 Convolution xff09 xff

随机推荐