SpringBoot整合RabbitMq实现ACK机制--消息回退机制--消息确认机制

2023-05-16

文章目录

    • 1. 环境配置
    • 2. RabbitMq配置
      • 2.1 消息发送确认机制
      • 2.2 消息投递确认机制
      • 2.3 ACK消息签收机制
    • 3. 消息生产者

1. 环境配置

pom.xml

	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>

    <dependencies>
        <!-- rabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 测试类 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

application.yml

server:
  port: 9001
spring:
  rabbitmq:
    host: 127.0.0.1
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息 处理完成才能获取下一条 能者多劳模式 默认为预取机制
        acknowledge-mode: manual # 设置消费端手动ack确认
        retry:
          enabled: true # 是否支持重试
    publisher-confirm-type: correlated # 确认消息已发送到交换机(Exchange)
    publisher-returns: true #  确认消息已发送到队列(Queue) 消息在未被队列接收时返回

2. RabbitMq配置

RabbitAdmin配置

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtualhost}")
    private String virtualhost;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
        connectionFactory.setPublisherConfirms(true);// 开启消息发布者确认
        connectionFactory.setPublisherReturns(true);// 确认消息发送到队列,如未被队列接收时返回
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

2.1 消息发送确认机制

为确保消息发送的准确性,设置发布时确认,确认消息是否到达 Broker 服务器 消息只要被Broker接收,就会触发 ConfirmCallbackConfig 回调

消息接收确认回调

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 消息只要被RabbitMq Broker接收 就会触发本回调
 * 消息发送确认回调
 */
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执行
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData 消息相关数据
     * @param ack             交换机是否收到消息
     * @param cause           失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) { // 消息投递到broker 的状态,true表示成功
            System.out.println("消息发送到Broker成功!");
        } else { // 发送异常
            System.out.println("发送异常原因 = " + cause);
        }
    }
}

2.2 消息投递确认机制

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作

消息投递机制回调接口

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
* 如果消息未能投递到目标queue里将触发本回调 returnCallback ,
 * 一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作
* */
@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct // @PostContruct是spring框架的注解,在⽅法上加该注解会在项⽬启动的时候执⾏该⽅法,也可以理解为在spring容器初始化的时候执
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("returnCallback ..............");
        System.out.println(message);// 消息本身
        System.out.println(i);// index
        System.out.println(s);
        System.out.println(s1);
        System.out.println(s2);// 队列名称
    }
}

2.3 ACK消息签收机制

为确保消息 消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制

以下是三种 channel 签收方式

  1. basicAck 消息确认
  2. basicNack 消息回退
  3. basicReject 消息拒绝

消费者消息确认机制

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


import java.io.IOException;

@Component // 如果把这个注释掉,就不会自动创建 simple.queue队列,则生产消息会触发 returnCallback 回调
public class ACKConfirmListener {
    @RabbitListener(queuesToDeclare = @Queue(value = "simple.queue", durable = "true")) // queuesToDeclare 自动声明队列
    public void ackListener(String msg, Channel channel, Message message) throws IOException {
        try {
            // 消息
            System.out.println("msg = " + msg);
            throw new RuntimeException("消费者故意抛出异常......");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("消息消费异常,重回队列......");
            /**
             * 出现异常,把消息重新投递回队列中,如一直有异常会一直循环投递
             * deliveryTag:表示消息投递序号。
             * multiple:是否批量确认。
             * requeue:值为 true 消息将重新入队列。
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
        /**
         * 消息确认 ACK
         * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
         * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
         */
        System.out.println("ACK消息消费确认.....")
        
        // 消息确认 basicAck
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
       
        // 消息拒绝 basicReject
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
}

3. 消息生产者

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqTest {

    // 注入 rabbitmq
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    // 直接模式(Direct)
    public void testSend() {
        rabbitTemplate.convertAndSend("simple.queue", "ACK消息确认机制生产者......");
    }

https://blog.csdn.net/qq_48721706/article/details/125194646

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot整合RabbitMq实现ACK机制--消息回退机制--消息确认机制 的相关文章

随机推荐

  • WebPack4学习实践笔记(一)

    一 准备 nodejs安装教程 xff1a https blog csdn net FED AF article details 105747632 xff09 二 安装 xff08 1 xff09 全局安装 初始化npm C Users
  • 可重入锁(又名递归锁)

    可重入锁 指的是同一线程外层函数获得锁之后 xff0c 内层递归函数仍能获取该锁的代码 xff0c 在同一线程的外层获取锁的时候 xff0c 在进入内层方法会自动获取锁 xff0c 也就是说线程可以任意进入它已经拥有的锁的同步代码块 syn
  • Java 读取TXT文件-行读取

    Java 读取TXT文件 行读取 span class token keyword public span span class token keyword static span String span class token funct
  • springMVC 指定文件 压缩下载

    span class token keyword package span span class token namespace com span class token punctuation span lyt span class to
  • HC-SR04超声波模块

    1 硬件原理图 2 传感器参数表 电气参数HC SR04 超声波模块工作电压DC 5 V工作电流15mA工作频率40kHz最远射程4m最近射程2cm测量角度15 度输入触发信号10uS 的 TTL 脉冲输出回响信号输出 TTL 电平信号 x
  • STM32F1系列-UCOSIII配置之delay_init()函数详解

    一 时钟选择 SysTick CLKSourceConfig xff08 xff09 xff1b 选择systick时钟 xff0c 函数配置的寄存器如下所示 找到该函数的定义 xff0c 其两个参数的数值如下 该函数选择的参数是SysTi
  • xfce4-session: Unable to access file /home/user/.ICEauthority: Permission denied

    wsl想要运行startxfce4却出现错误 xff1a usr bin startxfce4 X server already running on display 172 18 64 1 0 xfce4 session Unable t
  • YOLO 目标框回归(三)

    边框预测公式分析 Cx xff0c Cy 是 feature map 中 grid cell 的左上角坐标 xff1b Pw xff0c Ph 是预设的 anchor box 映射到 feature map 中的宽和高 最终得到的边框坐标值
  • 【机器人学】机器人运动学基础

    文章目录 DH table参考MDHSDH 齐次变换矩阵左乘 右乘左右乘与DH参数 欧拉角欧拉角的奇异性欧拉角万向节死锁 四元数齐次变换矩阵 欧拉角 轴角和四元数之间的转化关系机器人pieper准则 DH table DH参数可以说是机器人
  • 【IKFast】IKFast配置与使用

    文章目录 参考Fast of All环境什么是IKFast安装openRave相关Create Collada File For Use With OpenRAVECreate IKFast Solution CPP File验证从仿真验证
  • 2019.11.12-最新大华摄像机SDK开发,预览实时视频并指定码流格式保存到文件中(可观看)

    大华摄像机SDK开发 预览实时视频并指定码流格式保存到文件中 由于本人最近在开发大华摄像机 特此分享一些经验给到各位开发朋友 本次实例是关于大华摄像机的实时预览视频码流保存到文件中的Demo 本人还开发过海康威斯的SDK 如果有需要的话看我
  • 树莓派3下载安装Android系统图文教程

    树莓派3怎样安装Android系统 xff1f 树莓派3相比上一代性能增强很多 xff0c 采用64位四核1 2GHz处理器 xff0c 搭载1GB LPDDR2内存 xff0c 虽然配置仅相当于入门级Android手机 xff0c 但运行
  • 海康威视摄像机SDK二次开发-JavaDemo环境搭建详解

    由于此前写了两篇具体功能实现的博文 但是发现好多小伙伴对SDK不是很熟悉 下载下来不知道怎么跑在系统里 特此写下这篇文章记载一下JavaDemo的初始化准备 初始化Windows SDK Java环境 https blog csdn net
  • 使用IDEA如何对Java项目进行打包

    我们开发的项目大部分都使用jar包交付 那么如何使用IDEA打jar包呢 下面我们写一个最简单的打包方式 1 首先先把项目编译一下 点击顶部工具栏 build gt build project 2 打包项目 点击file 选择此项 按以下顺
  • Docker是什么?它的优点与作用是什么?我们为什么使用docker?

    文章目录 1 我们为什么使用Docker 2 Docker是什么 xff1f 2 1 镜像 概念 xff1a 复制的程序 2 2 容器 概念 xff1a 集装箱 2 3 仓库 概念 xff1a 存放镜像的地方 3 Docker的作用 xff
  • 使用Java-Maven操作MongoDB增删改查

    文章目录 1 环境配置2 单元测试2 1 查询记录2 2 新增记录2 3 更新记录2 4 删除记录 1 环境配置 pom xml 依赖 span class token tag span class token tag span class
  • Centos7-Docker搭建私有仓库与配置

    文章目录 1 私有仓库搭建与配置2 docker 客户端配置3 镜像上传至私有仓库4 客户端拉取镜像 1 私有仓库搭建与配置 搭建私有仓库的作用是为了使用docker的镜像 把开发环境 测试环境 运维环境统一软件版本 使开发测试过程中不会出
  • centos7-Docker {:plugins_not_found, [:“rabbitmq_delayed_message_exchange-XXX.ez“]}

    文章目录 1 下载插件2 Docker 导入插件3 启动插件错误代码 1 下载插件 github 插件下载 最好下载 3 9版本 版本太高不支持 下载成功后上传到服务器中 2 Docker 导入插件 使用命令把插件上传到Docker容器的p
  • springboot集成RabbitMQ---延迟队列---TTL(队列)---死信队列

    文章目录 1 延迟队列1 1 插件下载1 2 延迟队列环境配置1 3 生产者 消费者1 4 结果验证 2 TTL队列2 1 封装TTL队列工具类2 2 结果验证2 3 单条消息设置TTL2 4 结果验证 3 死信队列3 1 死信从何而来3
  • SpringBoot整合RabbitMq实现ACK机制--消息回退机制--消息确认机制

    文章目录 1 环境配置2 RabbitMq配置2 1 消息发送确认机制2 2 消息投递确认机制2 3 ACK消息签收机制 3 消息生产者 1 环境配置 pom xml span class token tag span class toke