简介
rabbitMQ延时任务的实现思想:
rabbitmq实现延时任务本质就是使用 "过期时间" 和 "死信队列"实现的,首先定义一个死信队列和死信队列的消费者,这个死信队列用来接收延时队列过期的消息,死信队列消费者用来接收到过期消息后就进行消费,在定义一个延时队列存储我们的信息,这个延时队列没有消费者,所以消息一直存在这个队列中,当我们设置了过期时间为24小时后,每个消息24小时候就会过期,进入到死信队列中,死信队列中有消费者,只要一过期就会被死信队列的消费者消费,就实现了延时任务
准备工作
这里不介绍如何安装rabbitmq和erlang了,如果有需要可以跳转下面的
windows环境下安装RabbitMQ(超详细)_windows安装rabbitmq_luckySnow-julyo的博客-CSDN博客
创建springBoot项目,下载依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
实现
方式一:队列设置过期时间(建议使用该方式)
生产者
我们首先创建一个死信队列
package com.qtt.mq.create.scheduled;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetter {
//创建死信的队列
@Bean
public Queue createDeadLetterQueue1(){
//queue(队列名,是否持久化)
return new Queue("dead-letter-queue-1",true);
}
//创建死信的交换机
@Bean
public DirectExchange createDeadLetterExchange1(){
//DirectExchange(交换机名称,是否持久化,是否自动删除)
return new DirectExchange("dead-letter-exchange-1",true,false);
}
//队列和交换价绑定
@Bean
public Binding createDeadLetterBinding1(){
return BindingBuilder.bind(createDeadLetterQueue1())
.to(createDeadLetterExchange1())
//routing-key
.with("111");
}
}
创建一个延时用的队列(该队列没有消费者,且要设置过期时间)
package com.qtt.mq.create.scheduled;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueue {
//创建队列,且指定队列的死信队列
@Bean
public Queue createDelayedQueue(){
//创建map用于定义队列的一些属性
Map<String, Object> map = new HashMap<>();
//指定队列的死信队列的交换机(x-dead-letter-exchange是固定的)
map.put("x-dead-letter-exchange","dead-letter-exchange-1");
//指定死信接收队列绑定的routing-key(x-dead-letter-routing-key)
map.put("x-dead-letter-routing-key","111");
//设置队列消息过期时间(如果我们通过消息设置过期时间,这个可以不定义,但建议使用队列过期)
map.put("x-message-ttl",60000);
//Queue(队列名称,是否持久化,不知道,不知道,队列属性)
return new Queue("delayed-send-queue",true,false,false,map);
}
//创建交换机
@Bean
public DirectExchange createDelayedExchange(){
//DirectExchange(交换机名称,是否持久化,是否自动删除)
return new DirectExchange("delayed-send-exchange",true,false);
}
//创建绑定
@Bean
public Binding createDelayedBinding(){
return BindingBuilder.bind(createDelayedQueue())
.to(createDelayedExchange())
//routing-key
.with("222");
}
}
将消息发送到我们的延时队列中
package com.qtt.mq.controller;
import com.qtt.mq.send.IsendMQService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class) //test自动装配为空的问题解决
public class mqController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void delayedTest(){
rabbitTemplate.convertAndSend("delayed-send-exchange","222","测试延时任务");
}
}
消费者
给我们延时任务的死信队列设置消费者进行消费
package com.qtt.mq.reception;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//一定要加入到spring容器中
@Component
public class DelayedReception {
//定义的监听队列(该处是监听的死信息队列)
@RabbitListener(queues = {"dead-letter-queue-1"})
public void delayedTest(String mess){
System.out.println(mess);
}
}
方式二:消息设置过期时间
和上面的思想一样,只是使用的方式不同
1. 延时队列的过期时间去掉(注意:如果已经指定了死信队列,要删除该队列,在重新创建,因为指定过死信队列后不能通过代码修改属性)
不去掉也行,但默认会使用最小的一个过期时间,所以可能会冲突
2. 定义消息时定义消息过期时间
package com.qtt.mq.controller;
import com.qtt.mq.send.IsendMQService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class) //test自动装配为空的问题解决
public class mqController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void delayedTest(){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message("这是一个消息测试死信队列".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delayed-send-exchange","222",message);
}
}