RabbitMQ与SpringBoot整合实战

2023-10-29

SpringBoot整合RabbitMQ
SpringBoot与RabbitMQ集成非常筒単,不需要做任何的额外设置只需要两步即可:
step1:引入相关依赖:spring-boot-starter-amqp
step2:対application.properties迸行配置

生产端核心配置

消费端核心配置

SpringBoot整合RabbitMQ实战
1.首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New --> Spring Starter Project


2.添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
然后复制一份工程,重命名为rabbitmq-springboot-consumer,修改pom相关artifactId,name及description

生产端工程
添加生产端配置

# rabbitmq连接基本配置
spring.rabbitmq.addresses=192.168.0.113:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
创建配置类

@Configuration
@ComponentScan({"com.rxy.springboot.*"})
public class MainConfig {

}
消息的confirm和return机制
publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback
publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback
注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效
生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等
创建生产端处理类

import java.util.Map;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //确认机制
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         * correlationData: 回调的相关数据,包含了消息ID
         * ack: ack结果,true代表ack,false代表nack
         * cause: 如果为nack,返回原因,否则为null
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                //做一些补偿机制等
                System.err.println("异常处理....");
            }
        }
    };
    //返回机制
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, 
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        //注意导包
        Message msg = MessageBuilder.createMessage(message, messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 ,保证全局唯一 ,这个是实际消息的ID
        //在做补偿性机制的时候通过ID来获取到这条消息进行重发
        String id = "1234567890";
        CorrelationData correlationData = new CorrelationData(id);
        //exchange, routingKey, object, correlationData
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }
}
在管控台创建topic交换机exchange-1和队列queue-1,并建立绑定关系为springboot.#
测试方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqSpringbootProducerApplicationTests {

    @Test
    public void contextLoads() {
    }

    @Autowired
    private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
    @Test
    public void testSender1() throws Exception {
         Map<String, Object> properties = new HashMap<>();
         properties.put("number", "12345");
         properties.put("send_time", simpleDateFormat.format(new Date()));
         rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
    }
}
运行测试方法,打印如下内容

correlationData: CorrelationData [id=1234567890]
ack: true
将发送消息convertAndSend的routingKey修改为spring.hello,再次运行测试方法,打印如下内容

return exchange: exchange-1, routingKey: spring.abc, replyCode: 312, replyText: NO_ROUTE
correlationData: CorrelationData [id=1234567890]
ack: true
消费端工程
首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列(不推荐)、根据业务记录日志等处理
可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况
@RabbitMQListener注解
消费端监听@RabbitMQListener注解,这个在实际工作中非常的好用。
@RabbitListener是一个组合注解,里面可以注解配置:
@QueueBinding、@Queue、 @Exchange直接通过这个组合注解一次性搞定消费端交换机、 队列、绑定、路由、并且配置监听功能等
由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置

消费端配置

spring.rabbitmq.addresses=192.168.0.113:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
# 设置当前消费者数量(线程数)
spring.rabbitmq.listener.simple.concurrency=5
# 设置消费者最大并发数量
spring.rabbitmq.listener.simple.max-concurrency=10
消费端消息处理类

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import com.rxy.springboot.entity.Order;

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}
前面生产端已经发送了一条消息到queue-1队列,可以再运行发送一条,然后运行消费端工程启动类Application,打印如下

--------------------------------------
消费端Payload: Hello RabbitMQ For Spring Boot!
--------------------------------------
消费端Payload: Hello RabbitMQ For Spring Boot!
其实@RabbitListener会自动声明队列、交换机及绑定关系,可以在管控台删除对应的队列和交换机,然后重新运行进行测试

使用配置方式
将队列、交换机、绑定关系使用配置方式,并且消息体内容使用java对象

首先增加相关的配置
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
另外两个工程都增加实体类Order,注意要发送java对象,必须实现序列化接口

public class Order implements Serializable {
    private String id;
    private String name;
}
消费类增加Order对象消息的处理方法
@Payload: 接收消息的消息体对象
@Headers: 接收消息的属性
AmqpHeaders: 抽象类,里面包含了消息的常用属性key
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
            durable="${spring.rabbitmq.listener.order.exchange.durable}", 
            type= "${spring.rabbitmq.listener.order.exchange.type}", 
            ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, 
            Channel channel, 
            @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
生产端发送方法,直接发送java对象

    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Order order) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("0987654321");
        rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
    }
生产端测试方法

    @Test
    public void testSender2() throws Exception {
         Order order = new Order("001", "第一个订单");
         rabbitSender.sendOrder(order);
    }
运行说明
先启动消费端,然后启动生产端

# 生产端打印
correlationData: CorrelationData [id=0987654321]
ack: true
# 消费端打印
--------------------------------------
消费端order: 001
 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ与SpringBoot整合实战 的相关文章

随机推荐

  • Gradle Springboot Web热部署的设置

    前言 在我们平时开发当中 需要实时的要查看我们的编码效果 这个时候如果我们设置了热部署 就免去了我们频繁的重启本地项目 热部署我们需要用到spring boot devtools这个组件 这里我们采用的gradle来管理项目 所以采用的ma
  • LeetCode 226. 翻转二叉树

    题目链接 https leetcode cn com problems invert binary tree 先序遍历 Java 代码 class Solution public TreeNode invertTree TreeNode r
  • 我的世界ess服务器信息,我的世界ess指令怎么用 ess指令大全及用法详解

    我的世界ess指令都有哪些 作为风靡全球的沙盒游戏 我的世界带给玩家太多的乐趣 为了能更方便的游戏 ess指令能帮助我们更好的游戏 很多新手玩家刚接触就被搞晕了 这么多的指令看起来有些复杂 下面就由小编给大家带来 我的世界ess指令都有哪些
  • mybatis进行批量插入 返回批量插入主键ID 插入不成功等问题

    这篇博文讲的是批量插入的例子 dao层框架用的mybatis 最一开始我的批量插入其实是个伪批量 是类似吧很多条insert into语句 直接拼成一条 然后直接运行 发现这样的效率真的是十分低 我做测试时285条数据 插入一次需要10S多
  • uniapp添加.gitignore以及不生效解决办法

    一 第一次新建 gitignore 首先进入项目 命令行新建 gitignore文件 touch gitignore 然后编辑器打开 进入到项目中新建的 gitignore 文件 复制粘贴以下 node modules project un
  • C++实现鼠标点击其他程序

    1 主要是SendInput函数 代码如下 初始化 INPUT input 0 input type INPUT MOUSE dx dy代表的是进行点击的坐标 下面显示的是 950 150 input mi dx static cast
  • 【Proteus仿真】555组成的多谐振荡器电路

    Proteus仿真 555组成的多谐振荡器电路 Proteus仿真演示 多谐振荡器电路 多谐振荡器电路是一种矩形波产生电路 属于数字电路 三极管不工作在放大线性区 这种电路不需要外加触发信号便能连续地 周期性地自行产生矩形脉冲 该脉冲是由基
  • Stable Diffusion:ChatGPT与AI绘画,引领艺术的未来

    人工智能 AI 的快速发展正在为各个领域带来革命性的变化 其中包括艺术与创意领域 AI绘画是一种将人工智能技术与艺术创作相结合的新兴范式 通过深度学习和生成对抗网络 GAN 等技术 AI绘画可以生成各种富有创意和想象力的艺术作品 本文将探讨
  • python - __str__ 和 __repr__

    内建函数str 和repr representation 表达 表示 或反引号操作符 可以方便地以字符串的方式获取对象的内容 类型 数值属性等信息 str 函数得到的字符串可读性好 故被print调用 而repr 函数得到的字符串通常可以用
  • Docker+docker-compose+nginx部署已有项目

    项目背景 在异地服务器拷docker相关项目到新的服务器 具体操作 1 新服务器安装好docker 2 新服务器安装好docker compose 3 从老服务器拷贝镜像到新服务器 4 新服务器导入镜像 5 构建项目地址挂载目录 找到doc
  • 用U盘作启动盘装Windows10系统整套流程 纯净版(不用其他乱七八糟的软件)(macOS适用)

    简介 本人的电脑是MacBook Air 2014年版的 因为内存小而且文件杂乱 所以一下子都给格式化了 但是要用Mac自带的恢复系统的话需要连接校园网 连接校园网又需要打开网页输入账号和密码 我们学校的校园网是这样的 所以只能用U盘作为格
  • gqrx编译过程记录

    gqrx编译过程记录 目标 环境 编译 下载源代码 建立编译位置 修改CMakefile txt中的模块 编译安装 运行界面 没有更多 目标 在ubuntun 20 04桌面版编译gqrx 通过USRP 205mini实现收音机功能 环境
  • 【解决】docker容器怎么使用宿主机的IPv6地址

    在IPv4时代 我们对外访问都是端口映射 都没有公网IP 但是在IPv6时大家都有公网IP 可能需要容器地址和主机地址一致 可以在docker run时使用参数 network host 则此容器网络和宿主机一致 docker run ne
  • AQS详解

    AQS详解 文章目录 AQS详解 AQS简单介绍 AQS原理 AQS原理概览 AQS对资源的共享方式 AQS定义两种资源共享方式 Exclusive 独占 Share 共享 AQS底层使用了模板方法模式 Semaphore 信号量 Coun
  • 浅谈可重入锁

    一 可重入锁 递归锁 1 概念 同一个线程在外层方法获取锁的时候 再进入该线程的内层方法会自动获取锁 前提是 锁对象是同一个对象 不是因为之前已经获取过还没有释放而阻塞 2 java中的ReentrantLock和synchronied都是
  • 关于Gdi+和GdiplusStartup

    GDI 实际上是一组类的定义 封装了gdi 的几乎所有API 当然使用方法就要从这些 例子 里边寻找了 本文正是尝试用GDI 写一个纯SDK的程序 语言自然是我最喜欢的语言WIN32ASM 这个程序很简单 就是用GDI 画了一条直线 算是抛
  • HCIA-FusionCompute华为企业级虚拟化

    一 云计算 按需付费 集中资源对外提供服务 1 云本身没有资源 云是资源整合者 整合底层的所有计算机资源 cpu 内存 磁盘等 云计算是一种模型 它可以实现随时随地 随需应变地从可配置计算资源共享池中获取所需的资源 例如 网络 服务器 存储
  • BigDecimal 问题小结

    BigDecimal 加法 add 函数 乘法multiply 函数 除法divide 函数 绝对值abs 函数 减法subtract 函数 ROUND CEILING 向正无穷方向舍入 ROUND DOWN 向零方向舍入 ROUND FL
  • 【Redis】新增数据结构

    BitMap位图 Redis提供了Bitmaps这个 数据类型 可以实现对位的操作 1 Bitmaps本身不是一种数据类型 实际上它就是字符串 key value 但是它可以对字符串的位进行操作 2 Bitmaps单独提供了一套命令 所以在
  • RabbitMQ与SpringBoot整合实战

    SpringBoot整合RabbitMQ SpringBoot与RabbitMQ集成非常筒単 不需要做任何的额外设置只需要两步即可 step1 引入相关依赖 spring boot starter amqp step2 対applicati