RabbitMQ(八)【高级 - 过期时间 TTL】

2023-10-29

八、RabbitMQ高级 - 过期时间 TTL


上一篇文章SpringBoot案例

概述

过期时间 TTL 表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对 消息和队列 设置 TTL,目前有两种方法可以设置

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间
  • 第二种方法是对消息进行单独设置,每条消息 TTL 可以不同

如果上述两种方式同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息队列的生存时间一旦超过设置的 TLL 值,就称为 dead message 被投递到死信队列,消费者将无法再收到该消息

8.1 队列属性设置过期时间

队列过期时间 - 代码测试

  1. springboot-order-rabbitmq-producer工程下的config包下,新建TTLRabbitMQConfiguration.java
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {


    // 1.声明注册direct模式交换机
    @Bean
    public DirectExchange ttl_directExchange(){
        return new DirectExchange("ttl_order_exchange", true, false);
    }


    // 2.声明队列
    @Bean
    public Queue ttl_accountQueue(){
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        // 根据参数说明设置对应参数
        args.put("x-message-ttl",5000);
        return new Queue("account.ttl.queue", true, false, false, args);
    } 
    // 3.完成绑定关系(队列)
    @Bean
    public Binding ttl_accountBinding(){
        return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
    }

}
  1. service包下的OrderServiceImpl.java新增 ttl 函数
package com.vinjcent.rabbitmq.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId, int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId, int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId, int num){
        //......
    }

    // ttl
    public void createOrderTtl(String userId, String productId, int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "ttl_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "ttl";

        /*
         *  #.account.#
         *  *.express.#
         *  sms.#
         */
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);

    }
}

在这里插入图片描述

  1. 测试用例
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }

    // 选择该测试用例
    @Test
    void testTtl() {
        orderService.createOrderTtl("1","1",12);
    }
}
  1. 查看web界面中的Queues队列

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

8.2 队列里的消息设置过期时间

队列消息过期时间 - 代码测试

  1. springboot-order-rabbitmq-producer工程下的config包下,在TTLRabbitMQConfiguration.java类添加以下内容
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange ttl_directExchange(){
        return new DirectExchange("ttl_order_exchange", true, false);
    }


    // 2.声明队列
    @Bean
    public Queue ttl_accountQueue(){
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        // 根据参数说明设置对应参数
        args.put("x-message-ttl",5000);
        return new Queue("account.ttl.queue", true, false, false, args);
    }

    @Bean
    public Queue ttlMessage_accountQueue(){
        return new Queue("account.ttl.message.queue", true, false, false);
    }

    @Bean
    public Binding ttl_accountBinding(){
		// ...
    }

	// 绑定操作
    @Bean
    public Binding ttlMessage_accountBinding(){
        return BindingBuilder.bind(ttlMessage_accountQueue()).to(ttl_directExchange()).with("ttlmessage");
    }

}
  1. service包下的OrderServiceImpl.java新增 ttlMessage 函数
package com.vinjcent.rabbitmq.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId, int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId, int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId, int num){
        //......
    }

    // ttl
    public void createOrderTtl(String userId, String productId, int num){
        //......
    }

    // ttlmessage
    public void createOrderTtlMessage(String userId, String productId, int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "ttl_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "ttlmessage";

        // 给消息设置过期时间
        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 参数类型是字符串
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, postProcessor);
    }
}
  1. 测试用例
package com.vinjcent.rabbitmq;

import com.vinjcent.rabbitmq.service.OrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }

    @Test
    void testTtl() {
        orderService.createOrderTtl("1","1",12);
    }

    // 使用该测试用例
    @Test
    void testTtlMessage() {
        orderService.createOrderTtlMessage("1","1",12);
    }
}

在这里插入图片描述

在这里插入图片描述

两者的区别在于,过期的消息队列,不会立即删除掉队列里的消息,而是将该队列中的消息放在死信队列中;而过期消息的消息队列则会立马删除掉该指定的消息

当两者同时存在时,谁的有效期时间短,就以该较短的时间作为标准

8.3 死信队列

概述

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为私信邮箱。当消息在一个队列中变成了死信(Dead Message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列

消息变成死信,有如下原因

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq 就会自动将这个消息重新发布到设置的 DLX 上去,进而被路由到另外一个队列,即死信队列

想要使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange指定交换机即可

在这里插入图片描述

  1. springboot-order-rabbitmq-producer工程下的config包下,添加DeadRabbitMQConfiguration.java类(定义死信交换机和死信队列
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DeadRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("ttl_dead_exchange", true, false);
    }

    // 2.声明队列
    @Bean
    public Queue deadQueue(){
        return new Queue("direct.dead.queue", true);
    }

    // 3.完成绑定关系(队列)
    // 路由
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }

}
  1. 在该工程下修改service包下的TTLRabbitMQConfiguration.java类(为该队列和交换机绑定死信参数

package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TTLRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange ttl_directExchange(){
        return new DirectExchange("ttl_order_exchange",true,false);
    }


    // 2.声明队列
    @Bean
    public Queue ttl_accountQueue(){
        // 设置过期时间
        Map<String, Object> args = new HashMap<>();
        // 根据参数说明设置对应参数
        args.put("x-message-ttl",5000);
        // 配置死信交换机
        args.put("x-dead-letter-exchange","ttl_dead_exchange");
        // 配置死信队列的路由key
        args.put("x-dead-letter-routing-key","dead");   // direct模式需要配置路由key,而fanout模式不需要
        return new Queue("account.ttl.queue",true,false,false,args);
    }

    // 3.完成绑定关系(队列)
    // direct模式比fanout模式多了一个路由key
    @Bean
    public Binding ttl_accountBinding(){
        return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
    }


}


  1. 在该工程下修改service包下的OrderServiceImpl.java类下的createOrderTtl函数
package com.vinjcent.rabbitmq.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId, int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId, int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId, int num){
        //......
    }

    // ttl
    public void createOrderTtl(String userId, String productId, int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "ttl_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "ttl";

        /*
         *  #.account.#
         *  *.express.#
         *  sms.#
         */
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);

    }

    // ttlmessage
    public void createOrderTtlMessage(String userId, String productId,int num){
        //......
    }




}

配置的死信队列参数可在web界面查看

在这里插入图片描述

在这里插入图片描述

  1. 运行测试用例
package com.vinjcent.rabbitmq;

import com.vinjcent.rabbitmq.service.OrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }

    // 使用该测试用例
    @Test
    void testTtl() {
        orderService.createOrderTtl("1","1",12);
    }

    @Test
    void testTtlMessage() {
        orderService.createOrderTtlMessage("1","1",12);
    }
}

在这里插入图片描述

报错原因:由于已创建的队列,在原来的基础上进行代码的修改,这个队列不会被更改或者覆盖,只能通过删除之后重新运行

在这里插入图片描述

在实际开发中,并不推荐直接从线上直接删除,这样做的风险比较大。可以通过重新创建一个队列,将要删除的队列的死信队列与之绑定,将死信队列里的信息用生产者发送到新的队列当中,实现转换/迁移的过程

在这里插入图片描述

在这里插入图片描述

对于参数的设置,可以在web界面查看异常队列规则,从而限制队列消息的推入
在这里插入图片描述

下一篇文章内存磁盘的监控

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ(八)【高级 - 过期时间 TTL】 的相关文章

随机推荐

  • 时间序列匹配之dtw的python实现(一)

    简介 Dynamic Time Warping 动态时间序列扭曲匹配 简称DTW 是时间序列分析的经典算法 用来比较两条时间序列之间的距离 发现最短路径 笔者在github上搜索dtw时发现了两个比较经典的库 dtw和dtw python
  • 最强英语翻译技巧-IT人士专用

    最强英语翻译技巧 IT人士专用 github传送门
  • UE4导入插件后编译无法通过,提示缺少typeinfo.h的坑。(VS2019踩坑记录)

    先说结论 VS有时候不用急着更新最新版本 包括工具也是 都是坑 再出问题我换17用了 参考链接 http papalqi cn 201910 cid 406 html https developercommunity visualstudi
  • 博客转站了~

    由于 CSDN 的博客功能经常有问题 所以把博文站点搬迁到 https www runzhliu cn 欢迎订阅 评论
  • Unity_物体旋转方法归纳

    0 旋转的三种方式 1 矩阵旋转 2 欧拉旋转 会造成万向节锁问题 3 四元数旋转 可避免万向节锁现象 1 方法1 Transform rotation Transform localRotation 旋转角度 void Update if
  • VC获取Unix时间戳

    获取Unix时间戳 std wstring GetUnixTimeStamp void FILETIME ft 0 SYSTEMTIME st 0 ULARGE INTEGER ull 0 GetSystemTime st SystemTi
  • 【数据分析】如何量化时间序列之间的相似性?

    如何量化时间序列之间的相似性 逝者如斯夫 不舍昼夜 时间不会停止 世界上的一切都在不断运动 抛开物理学或哲学的概念不提 几乎所有东西都可以被描述为一系列的事件 对数据更感兴趣的人来说 它们又可以被看做是一系列的测量 这就是我们所说的时间序列
  • 外卖店优先级问题(双指针降低时间复杂度)

    外卖店优先级问题 文章目录 外卖店优先级问题 问题详情 问题分析 1 普通思路 1 1输入储存问题 1 2 遍历存储的数据 1 3遍历 st N 数组 获得最终值 2 双指针 时间复杂度 O n 代码 具体的解析都在注释当中 其中困惑的点都
  • 面试题:InnoDB中一棵B+树能存多少行数据?

    作者 李平 原文地址 www cnblogs com leefreeman p 8315844 html 一 InnoDB一棵B 树可以存放多少行数据 InnoDB一棵B 树可以存放多少行数据 这个问题的简单回答是 约2千万 为什么是这么多
  • 怎么让qt程序在Android系统上运行

    我们都知道支持跨平台是Qt的特点之一 也是比较重要的特点 最近在学习Qt的开发 就在想 它是怎么支持跨平台的呢 我平时的程序都是windows系统下开发和运行 语言用c 那怎么把我用c 写的qt程序运行在android平台上呢 带着这个问题
  • 利用Vulnhub复现漏洞 - Discuz!X ≤3.4 任意文件删除漏洞

    Discuz X 3 4 任意文件删除漏洞 Vulnhub官方复现教程 漏洞原理 复现漏洞 启动环境 漏洞复现 Formhash Cookie 发送数据包 原文 Vulnhub官方复现教程 https vulhub org environm
  • angular.js官方文档的一些说明

    由于angular phonecat那个项目更新了 中文翻译资料很多还是之前版本的 用起来会有问题 自己也碰到一些坑 在此说明下 windows 1 node js官网下载node的安装包 http nodejs org dist v0 1
  • PWN学习---1、基本漏洞-缓冲区溢出

    环境 Ubuntu12 04 32位 一 存在漏洞代码 vuln c include
  • p值小于0.05拒绝还是接受_计算p-值和第二类错误

    专栏里有一篇 两类错误弃真与取假 对显著性水平 P value 第二类错误 功效进行了解释 这里换种行文说 并给出具体计算例子 用来增加对这几个概念的认识和理解 首先我们要清楚我们手上只有一份容量为 的样本 实践中希望通过该份样本进行统计推
  • 老司机教你如何用Unity和Cardboard把3D游戏做成VR游戏

    原网址 随着Oculus宣布1月6日开启预售 2016年很可能成为VR游戏元年 但很多的调研显示 手游设备才是市场增长的关键 SuperData发布的报告显示 2016年全球VR游戏市场规模预计在51亿美元左右 消费者设备安装量在3890万
  • 你一定要知道的那些以太坊术语解释

    block 区块 包含交易 0或多个 父区块 parent hash及其他数据的数据包 在区块链中 除创世区块的每个区块都要包含其父区块的hash 整个区块链包含一个网络的所有交易历史 注意 有些基于区块链的加密货币自称 账本 而非 区块链
  • HashMap底层原理分析

    HashMap的实现原理在Jdk1 7之前底层实现是基于数组 链表的形式 即通过HashMap的key值进行Hash 然后对容量 默认16 负载因子 默认是0 75 进行取余获取到其桶的位置 比如某一个key的hash值是18 容量为为16
  • C语言if语句,for循环相关年月日应用

    文章目录 一 if双分支语句变成单分支语句 1 if单分支语句 2 if双分支语句 3 if双分支语句改成单分支语句 二 采用冒泡排序法对n个数据排序 编写n个元素数组 用冒泡排序法进行排序 三 输入一个年份 判断是不是闰年 1 满足两个条
  • 俞敏洪演讲:创业不需要有钱 只需想着挣钱

    各位朋友大家下午好 上午我坐下面我发现我跟郭凡生郭总很大不同 他认为他自己很聪明 而我恰恰认为自己不聪明 我呢认为自己也不一定笨 他认为他很有吸引力 确实是对三四十岁的中年人来说 郭总很有创业热情 因为这个年龄的人 能够用那样的语气语调 两
  • RabbitMQ(八)【高级 - 过期时间 TTL】

    八 RabbitMQ高级 过期时间 TTL 上一篇文章 SpringBoot案例 概述 过期时间 TTL 表示可以对消息设置预期的时间 在这个时间内都可以被消费者接收获取 过了之后消息将自动被删除 RabbitMQ可以对 消息和队列 设置