延迟队列
延迟队列的概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
队列TTL实现延迟队列
创建两个队列 QA
和 QB
,两者队列 TTL
分别设置为 10S 和 40S,然后在创建一个交换机 X
和死信交 换机 Y
,它们的类型都是 direct
,创建一个死信队列 QD
,它们的绑定关系如下:
创建springboot
项目,修改pom.xml
文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
修改配置文件application.properties
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
添加rabbitmq
的配置类TtlQueueConfig
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE ="Y";
public static final String DEAD_LETTER_QUEUE ="QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(params).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("queueB")
public Queue queueB(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(params).build();
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
添加生产者代码
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
}
添加消费者代码
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message){
String msg = new String( message.getBody());
log.info("当前时间:{},收到死信队列信息:{}",new Date(),msg);
}
}
发起一个请求localhost:8080/ttl/sendMsg/hello-world
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求
延迟队列优化
在这里新增一个队列QC,绑定关系如下,该队列不设置TTL时间
创建配置类MsgTtlQueueConfig
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(params).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
添加消息生产者代码
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条消息给{}毫秒TTL队列:{}",new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
}
发起请求
localhost:8080/ttl/sendExpirationMsg/hello-world-1/20000
localhost:8080/ttl/sendExpirationMsg/hello-world-2/2000
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
延迟插件实现延迟队列
如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列
安装延迟队列插件
下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
启动命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
图示
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下
案例演示
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中
添加配置文件代码
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME="delayed.queue";
public static final String DELAYED_EXCHANGE_NAME ="delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> params = new HashMap<>();
params.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,params);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
添加生产者代码
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message , @PathVariable Integer delayTime){
rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingKey",message,correlationData->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("当前时间:{},发送一条消息给{}毫秒队列delayed.queue:{}",new Date(),delayTime,message);
}
添加消费者代码
@RabbitListener(queues = "delayed.queue")
public void receiveDelayedQueue(Message message){
String msg = new String( message.getBody());
log.info("当前时间:{},收到延时队列信息:{}",new Date(),msg);
}
发起请求
localhost:8080/ttl/sendDelayMsg/hello-world-1/20000
localhost:8080/ttl/sendDelayMsg/hello-world-2/2000
第二个消息先被消费掉,符合预期。
总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)