RabbitMQ实现延迟消息【死信队列实现、插件实现】

2023-10-27

视频地址

之前一直没使用过RabbitMQ,最近有一个需求需要用到延迟消息,就简单的使用了一下,发现还是有蛮多坑的。

此篇文章只是RabbitMQ延迟消息相关内容,至于安装RabbitMQ等其它操作,参考百度。


一、什么是延迟消息

顾问思议所谓延迟消息就是延迟消息!!!


二、延迟消息实现方法

在RabbitMQ中实现延迟消息有下面几种方式。


2-1、基于插件实现

原生的RabbitMQ是不支持延迟消息的,我们可以先在MQ上安装一个插件然后再发送延迟消息。

优点:发送延迟消息比较简单,安装之后就相当于MQ支持了延迟消息

缺点:要安装插件


2-2、基于死信队列实现

可以理解成,给A队列发送一个设置了过期时间的消息(过期分为消息过期队列过期),但是不给A队列设置消费者,这个A队列就是死信队列(其实就是一个普通队列,不给它设置消费者而已)。

等到了时间还没有被消费,这个消息就会被投递到配置的队列,我们可以给这个队列设置一个消费者,然后去消费这个消息就可以达到延迟的目的。

绿色的线表示正常的消息流程,红色表示投递到死信队列流程
在这里插入图片描述

注:

  1. 在创建A队列的时候需要给它设置死信交换机和死信路由key(如果不设置,消息无法消费它怎么知道投递给哪个队列呢?)
  2. 当消息和队列同时设置了过期时间,当以最小的为准

优点: 听起来牛X,不需要安装插件

缺点:写起来复杂,还浪费一个队列


2-3、基于阿里云的RabbitMQ

一般都是在自己的服务器上去安装,但有的公司也会去买阿里云的RabbitMQ

阿里云版的除了上述的两种还额外的支持了延迟消息,你只要设置消息头即可

官方文档 https://help.aliyun.com/document_detail/148083.html#section-rpd-mjh-kee

在这里插入图片描述


优点:什么都不用做,直接用

缺点:要钱


三、代码实现


3-1、插件实现


3-1-1、安装插件

自行百度,如果你没有安装插件,使用这个延迟类型的交换机会报错

Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)

3-1-2、配置延迟交换机

// 配置延时交换器
@Bean
public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delay_exchange_name", "x-delayed-message",true, false,args);
}

3-1-3、发送延迟消息

消息都是一样的,在下面演示


3-2、死信队列实现

插件和阿里云的实现方式都很简单,一个是在配置交换机的时候设置一个属性,一个是直接发消息的时候设置过期时间,就不把全部的代码写出来了。

但是死信队列相对来说复杂一些,这里就把全部的代码罗列出来帮助你理解。


3-2-1、CityMapQueueConfig

队列、交换机、队列和交换机绑定配置

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CityMapQueueConfig {
	
	// 定义普通队列和死信队列的名字
    public static final String CITY_MAP_QUEUE = "city_map_queue";
    public static final String CITY_MAP_DEAD_QUEUE = "city_map_dead_queue";

	// 定义普通交换机和死信交换机的名字
    public static final String CITY_MAP_DEAD_NAME = "city_map_dead_exchange";
    public static final String CITY_MAP_NAME = "city_map_exchange";

	// 定义普通路由key和死信路由key的名字
    public static final String CITY_MAP_ROUTE_KEY = "city_map_route_key";
    public static final String CITY_MAP_ROUTE_DEAD_KEY = "city_map_route_dead_key";
	
	// 创建普通队列
    @Bean
    public Queue cityMapQueue() {
        return QueueBuilder
                .durable(CITY_MAP_QUEUE)
                .build();
    }
	
	// 创建死信队列
    @Bean
    public Queue cityMapDeadQueue() {

        return QueueBuilder
                .durable(CITY_MAP_DEAD_QUEUE)
                // 给死信队列配置 死信交换机 和 死信路由key
                .withArgument("x-dead-letter-exchange", CITY_MAP_NAME)
                .withArgument("x-dead-letter-routing-key", CITY_MAP_ROUTE_KEY)
                // 设置队列过期时间
//                .withArgument("x-message-ttl", 1000 * 15)
                .build();
    }
	
	// 创建普通交换机
    @Bean
    public Exchange exchange() {
        return ExchangeBuilder
                .topicExchange(CITY_MAP_NAME)
                .durable(true)
                .build();
    }
	
	// 创建死信交换机
    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder
                .topicExchange(CITY_MAP_DEAD_NAME)
                .durable(true)
                .build();
    }

	// 绑定死信交换机和队列
    @Bean
    public Binding deadBinding() {
        return BindingBuilder
                .bind(cityMapDeadQueue())
                .to(deadExchange())
                .with(CITY_MAP_ROUTE_DEAD_KEY)
                .and(null);
    }

	// 绑定普通交换机和队列
    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(cityMapQueue())
                .to(exchange())
                .with(CITY_MAP_ROUTE_KEY)
                .and(null);
    }

}


3-2-2、CityMapConsumer

消费者,只是需要消费一个队列就好了,死信队列无需设置消费者

import cn.ideamake.yxproperty.docking.mq.config.CityMapQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 城市地图消费者
 *
 * @author 陶梓洋
 * @date 2022/03/22
 */
@Component
@Slf4j
public class CityMapConsumer {

    @RabbitListener(queues = CityMapQueueConfig.CITY_MAP_QUEUE)
    @RabbitHandler
    public void handler(Message message) {
        log.info("接受到MQ异步消息");
        // TODO
        
    }
}

3-2-3、RabbitMqProvider

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;


@Component
@Slf4j
public class RabbitMqProvider {

    @Resource
    private AmqpTemplate rabbitMqTemplate;


    /**
     * 发送延迟消息
     * @param exchange  死信交换机
     * @param routerKey 死信路由key
     * @param jsonParams json格式的消息
     * @param messageId 唯一消息id
     * @param delay  延迟时间 单位秒
     */
    public void sendMessage(String exchange, String routerKey, String jsonParams,String messageId, int delay) {
        log.info("mq发送异步延迟消息:{}", jsonParams);
        if (StringUtils.isNotBlank(exchange) && StringUtils.isNotBlank(routerKey)) {
            delay = delay * 1000;
            MessageProperties props = MessagePropertiesBuilder.newInstance()
                    .setMessageId(messageId)
                    .setExpiration(String.valueOf(delay))
                    .build();
            rabbitMqTemplate.convertAndSend(exchange,
                    routerKey,
                    MessageBuilder.withBody(jsonParams.getBytes(StandardCharsets.UTF_8)).andProperties(props).build());

        } else {
            log.warn("参数不完整,未发送MQ消息的,exchange:{},routerKey:{}", exchange, routerKey);
        }
    }
}

3-2-4、使用

@Resource
private RabbitMqProvider rabbitMqProvider;

rabbitMqProvider.sendMessage(CityMapQueueConfig.CITY_MAP_DEAD_NAME, CityMapQueueConfig.CITY_MAP_ROUTE_DEAD_KEY,JSONObject.toJSONString(callTmpTaskDTO),callTmpTaskDTO.getCityMapTaskDetail().getTaskItemNo(),60);

3-3、阿里云版

直接在消息里面设置一个请求头就好了

MessagePropertiesBuilder.newInstance()
                .setMessageId(messageId)
                .setHeader("delay", l + "")
                .build();
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ实现延迟消息【死信队列实现、插件实现】 的相关文章

随机推荐

  • 数据绑定多记录 Web 服务器控件

    Visual Basic 和 Visual C 概念 数据绑定多记录 Web 服务器控件 多记录控件 例如 Repeater DataList DataGrid ListBox CheckBoxList 和 RadioButtonList
  • js中时间转换为date型

    若显示为YYYY MM DD HH mm ss格式 调用如下方法 datetimeFormat longTypeDate 若显示为YYYY MM DD格式 调用如下方法 dateFormat longTypeDate Js中具体方法如下 时
  • 学习笔记(109):R语言入门基础-text函数

    立即学习 https edu csdn net course play 24913 285853 utm source blogtoedu text函数 text x y labels x y是数据向量 labels可以是整数 也可以是字符
  • 华师大版数学分析下知识点总结

    本篇为数分专栏的索引 考前复习 考前主要知识点总结 数分下例题 知识点 11反常积分 反常积分 12章数项级数 数项级数收敛判别 13章函数列和函数项级数 函数列和函数项级数的收敛判别 四个收敛的关系 14章幂级数 幂级数知识点 15章傅里
  • 程序环境和预处理

    目录 一 程序的翻译环境和执行环境 二 详解编译 链接 2 1 翻译环境 2 2 编译本身也分为几个阶段 2 3 运行环境 三 预处理详解 3 1 预定义符号 3 2 define 定义标识符 3 3 define 定义宏 3 4 defi
  • 使用SeruTek超高速TDC 测量PCIE spread spectrum clock

    目录 SeruTek TDC 简介 超高速TDC评估与测试 测试平台 PCIE Spread Spectrum Clock SSC 简介 测试目的 测试内容 测试结果汇总 第一轮测试 单频点测试 扩频测试 第二轮测试 单频点测试 扩频测试
  • nginx反向代理,request.getServerName()的问题

    前几天配置了nginx的反向代理 可是有个问题 在项目中 写request getServerName 的时候 总是返回的127 0 0 1 这个地址 折腾的好久 今天搜了搜 发现是配置的原因 记载一下 我以前的配置 location pr
  • 服务器环境配置(CentOS7)

    文章目录 虚拟机网络配置 Java8安装配置 Scala安装配置 MySQL安装配置 Redis安装配置 Nginx安装配置 Zookeeper安装配置 Kafka安装配置 ElasticSearch 安装配置 ElasticSearch
  • mysql 收集状态和性能数据的脚本来自于高性能mysql

    mysql 收集状态和性能数据的脚本 bin sh x INTERVAL 10 PASSWORD uroot proot PREFIX INTERVAL sec status RUNFILE home benchmarks running
  • Element-ui 之 解决后端返回大量数据时页面卡顿或者反应慢的问题(懒加载以及下拉框回显未加载数据)

    在我们实际开发中 常常需要优化以下页面 加快页面的响应速度 下面来介绍一下 前端在使用 Element ui 下拉框时使用懒加载来解决页面卡顿或者响应慢的问题 场景描述 场景描述一 当后端给前端返回的数据有很多成千上万个数据时 我们如果直接
  • ADC转换后的值推算还原实际测量电量

    ADC转换后的值推算还原实际测量电量 1 确定ADC用几位表示 最大数值是多少 例如一个8位的ADC 最大值是0XFF 就是255 2 确定最大值时对应的参考电压值 一般而言最大值对应5V 具体需参考芯片ADC模块的说明 寄存器有对于输入信
  • 完全背包算法——蓝桥杯——(C语言)

    问题描述 有一個背包 容量為M 有N種物品 每種物品有其體積Wi與價值Vi 將這些物品的一部分放入背包 每種物品可以放任意多個 要求總體積不超過容量 且總價值最大 输入格式 第一行為N M 之後N行 每行為Wi Vi 输出格式 一個數 為最
  • Unity jobsystem 和 burst编译器代码演示及效率测试

    Unity jobsystem 和 burst编译器代码演示及效率测试 最近看了相关内容做了个测试 直接上代码 using System Collections using System Collections Generic using
  • 一句话木马原理介绍和中国菜刀原理的介绍

    菜刀 c刀 蚁剑都使用过 而且他们的方法基本上差不多 今天突然心血来潮写来研究一下一句话木马和菜刀的原理 下面的原理是自己看其他大佬写的和自己总结的一些介绍 希望能够帮助老表们 一句话木马原理 我这里就分析一下PHP代码 其他代码基本上差不
  • 简单编程:屏幕找图找色找字,自动化操作

    屏幕找字 使用系统自带组件 其实 Windows 10 11 系统自带的一个强大 免费的屏幕图像文字识别组件 调用该组件生成的 EXE 文件体积很小 下面我们用 aardio 编程语言写个例子 代码很简单 先上图看效果 请复制下面的源代码粘
  • 云服务器搭建网站域名要备案吗,域名绑定服务器需要备案吗

    域名绑定服务器需要备案吗 内容精选 换一换 一个网站可以绑定多个域名 注册多个不同的域名 并且将这些域名绑定到同一个网站上 可以让互联网访问者更容易找到该网站 增加网站的推广效果 不过需要注意的是 一个网站绑定多个域名 会分散网站的权重 可
  • 基于Mysql+Vue+Django的协同过滤和内容推荐算法的智能音乐推荐系统——深度学习算法应用(含全部工程源码)+数据集

    目录 前言 总体设计 系统整体结构图 系统流程图 运行环境 Python 环境 MySQL环境 VUE环境 模块实现 1 数据请求和储存 2 数据处理 计算歌曲 歌手 用户相似度 计算用户推荐集 3 数据存储与后台 4 数据展示 系统测试
  • stm32 pwm输出异常及解决过程

    pwm输出是stm32最常用的外设之一 我比较习惯使用库函数配置 我通常移植做过的工程中的程序的配置代码 然后改一改相应的参数 配置方法也十分简单 即使很简单 但对于初学者有时候还是很容易出错 有时候会一点波形没有输出 在找不到其他原因的情
  • 阿里前端实习电话面(一面)(被捞两次)

    学习前端时长一年半 以下是电话面试阿里前端被问的一些知识点 都是下来回顾的 某一天我接到了阿里的hr电话 约面试时间 我以为阿里和其他公司一样会发邮件给我 所以一直在等 结果我在实验室接到了电话 说今天面试 我说没收到邮件 可以等我半小时
  • RabbitMQ实现延迟消息【死信队列实现、插件实现】

    视频地址 之前一直没使用过RabbitMQ 最近有一个需求需要用到延迟消息 就简单的使用了一下 发现还是有蛮多坑的 此篇文章只是RabbitMQ延迟消息相关内容 至于安装RabbitMQ等其它操作 参考百度 一 什么是延迟消息 顾问思议所谓