9.延迟队列

2023-05-16

延迟队列

延迟队列的概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。

队列TTL实现延迟队列

创建两个队列 QAQB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

在这里插入图片描述

创建springboot项目,修改pom.xml文件

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
	</dependencies>

修改配置文件application.properties

server.port=8080

spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

添加rabbitmq的配置类TtlQueueConfig

@Configuration
public class TtlQueueConfig {

    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";

    public static final String Y_DEAD_LETTER_EXCHANGE ="Y";
    public static final String DEAD_LETTER_QUEUE ="QD";


    @Bean("xExchange")
    public DirectExchange xExchange(){
        return  new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange(){
        return  new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }


    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        params.put("x-dead-letter-routing-key","YD");
        params.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(params).build();
    }


    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean("queueB")
    public Queue queueB(){
        Map<String,Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        params.put("x-dead-letter-routing-key","YD");
        params.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(params).build();
    }

    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }


    @Bean("queueD")
    public Queue queueD(){
        return new Queue(DEAD_LETTER_QUEUE);
    }

    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

添加生产者代码

@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
    }
}

添加消费者代码

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message){
        String msg = new String( message.getBody());
        log.info("当前时间:{},收到死信队列信息:{}",new Date(),msg);
    }
}

发起一个请求localhost:8080/ttl/sendMsg/hello-world

在这里插入图片描述

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求

延迟队列优化

在这里新增一个队列QC,绑定关系如下,该队列不设置TTL时间

在这里插入图片描述

创建配置类MsgTtlQueueConfig

@Component
public class MsgTtlQueueConfig {

    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String QUEUE_C = "QC";


    @Bean("queueC")
    public Queue queueC(){
        Map<String,Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        params.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(params).build();
    }

    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

添加消息生产者代码

@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
  log.info("当前时间:{},发送一条消息给{}毫秒TTL队列:{}",new Date(),ttlTime,message);
  rabbitTemplate.convertAndSend("X","XC",message,correlationData ->{
    correlationData.getMessageProperties().setExpiration(ttlTime);
    return correlationData;
  });
}

发起请求

  • localhost:8080/ttl/sendExpirationMsg/hello-world-1/20000
  • localhost:8080/ttl/sendExpirationMsg/hello-world-2/2000

在这里插入图片描述

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

延迟插件实现延迟队列

如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列

安装延迟队列插件

下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

启动命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

图示

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下

在这里插入图片描述

案例演示

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中

添加配置文件代码

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME="delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME ="delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    @Bean
    public Queue delayedQueue(){
        return  new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange delayedExchange(){
        Map<String,Object> params = new HashMap<>();
        params.put("x-delayed-type","direct");
        return  new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,params);
    }
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

添加生产者代码

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message , @PathVariable Integer delayTime){
        rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingKey",message,correlationData->{
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
        log.info("当前时间:{},发送一条消息给{}毫秒队列delayed.queue:{}",new Date(),delayTime,message);
    }

添加消费者代码

    @RabbitListener(queues = "delayed.queue")
    public void receiveDelayedQueue(Message message){
        String msg = new String( message.getBody());
        log.info("当前时间:{},收到延时队列信息:{}",new Date(),msg);
    }

发起请求

  • localhost:8080/ttl/sendDelayMsg/hello-world-1/20000
  • localhost:8080/ttl/sendDelayMsg/hello-world-2/2000

在这里插入图片描述

第二个消息先被消费掉,符合预期。

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

9.延迟队列 的相关文章

随机推荐

  • 卡尔曼滤波详细推导

    卡尔曼滤波 xff08 Kalman filtering xff09 是一种利用线性系统状态方程 xff0c 通过系统输入输出观测数据 xff0c 对系统状态进行最优估计的算法 xff0c 由于观测数据中包括系统中的噪声和干扰的影响 xff
  • ROS tf工具与消息查看命令

    TF工具坐标系统是一个基础理论 xff0c 但是涉及到多个空间的变换 xff0c 不容易进行想象所以TF工具给开发者调试提供很多方便 1 tf monitor xff1a 将当前的坐标系转换关系打印到终端控制台 rosrun tf tf m
  • melodic 打开gazebo出现[Err] [REST.cc:205] Error in REST request错误解决方法

    ROS melodic版本下打开gazebo出现 Err REST cc 205 Error in REST request错误解决方法 输入以下命令打开文件 sudo gedit ignition fuel config yaml 然后将
  • 技术资源汇总(一)

    1 Ubuntu技术论坛 xff1a https askubuntu com 2 树莓派资源 https www yahboom com study raspberry3B 密码 xff1a cf0p 汇总资料提取码 xff1a hdy7
  • docker常用命令

    1 配置docker阿里云镜像 1 打开daemon json文件 xff08 若没有此文件 xff0c 则创建 etc docker daemon json xff09 xff1a vi etc docker daemon json 2
  • 网络调试助手UDP广播问题

    用直接广播地址 xff08 192 168 xxx 255 端口 xff09 可以进行广播 xff1b 用受限广播地址 xff08 255 255 255 255 端口 xff09 显示没有指定有效的远程主机端口 xff0c 搞了好久发现是
  • “平衡小车之家”家的STM32F103最小系统源代码分享

    在网上寻找了好久 xff0c 因为他家的开发板自带有mpu6050模块 故想测试其精准度以及z轴漂移程度 发现也有很大的漂移 代码如下 main c部分 xff1a span class token macro property span
  • 使用PMW3901和VL53L1X 实现室内定点悬停

    使用PMW3901和VL53L1X 实现室内定点悬停 使用PMW3901 光流传感器进行水平方向定位Pixhawk连接PMW3901传感器PX4源代码加入PMW3901驱动后重新编译QGroundControl中的配置 使用气压计和VL53
  • 使用 QGroundControl 地面站更新 PixHawk飞控的Bootloader

    安装最新版本的PX4固件 启动QGroundControl并且使用USB连接到Pixhawk飞控 选择 Q icon gt Vehicle Setup gt Firmware sidebar 打开固件设置 安装最新版本的PX4固件 更新Bo
  • 自制DIY 机器狗 完全教程 - MIT猎豹Cheetah

    自制DIY 机器狗 完全教程 MIT猎豹Cheetah 背景结构设计模块化关节电机性能考虑关节结构 四足平台设计腿部设计身体设计脚部设计 硬件设计关节驱动器通信总线板供电系统 控制系统人工智能 背景 3年前 xff0c MIT开源了世界上跑
  • centos安装wxWidgets,erlang,RabbitMq

    centos安装wxWidgets erlang RabbitMq 默认已经安装了java环境 而安装RabbitMq需要安装erlang xff0c 安装erlang又需要安装wxWidgets 安装wxWidgets 更新系统 yum
  • 2.rabbitmq概述和helloworld

    rabbitmq概述 rabbitmq中的几个概念 BROKER 接收和分发消息的应用 xff0c RabbitMQ Server 就是 Message Broker Virtual Host 出于多租户和安全因素设计的 xff0c 把 A
  • 3.rabbitmq轮询和不公平分发

    rabbitmq轮询和不公平分发 rabbitmq轮询分发 rabbitmq默认是使用轮询来分发消息的 测试代码如下所示 生产者代码 span class token comment 生产者 task rabbitmq 轮询演示 span
  • 4.rabbitmq消息应答

    rabbitmq消息应答 概述 消息应答就是消费者在收到消息的时候 xff0c 在它接收到消息并处理完毕之后 xff0c 告诉rabbitmq它已经处理完了 xff0c rabbitmq可以删除这个消息了 消息应答的方式 channel b
  • 5.rabbitmq持久化

    rabbitmq持久化 队列的持久化 队列的持久化需要我们在声明的时候指定其持久化 使用durable 61 true来持久化队列 span class token comment 队列的持久化 span span class token
  • 关于双控阵列的实现原理的讨论

    xfeff xfeff http bbs chinaunix net forum viewthread tid 4140392 html 对于一个支持FC SAN的双控存储阵列 xff0c 对外号称active active xff0c 实
  • 6.rabbitmq中exchange的几种形式

    rabbitmq中exchange的几种形式 RabbitMQ 消息传递模型的核心思想是 生产者生产的消息从不会直接发送到队列 实际上 xff0c 通常生产 者甚至都不知道这些消息传递传递到了哪些队列中 相反 xff0c 生产者只能将消息发
  • 7.rabbitmq死信和死信队列

    rabbitmq死信和死信队列 概述 先从概念解释上搞清楚这个定义 xff0c 死信 xff0c 顾名思义就是无法被消费的消息 xff0c 字面意思可以这样理 解 xff0c 一般来说 xff0c producer 将消息投递到 broke
  • 8.rabbitmq发布确认

    rabbitmq发布确认 生产者将信道设置成 confirm 模式 xff0c 一旦信道进入 confirm 模式 xff0c 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID 从 1 开始 xff0c 一旦消息被投递到所有匹配的队
  • 9.延迟队列

    延迟队列 延迟队列的概念 延时队列 队列内部是有序的 xff0c 最重要的特性就体现在它的延时属性上 xff0c 延时队列中的元素是希望 在指定时间到了以后或之前取出和处理 xff0c 简单来说 xff0c 延时队列就是用来存放需要在指定时