RabbitMQ进阶-Queue队列详解-延迟队列
1.延迟队列场景
1.1 场景
一般延迟队列用于特定事件发生后隔一段时间需要做特定处理的场景,下面举几个常见的栗子
1.电商系统中,若用户下单后30min不支付,自动取消订单
2.用户登录APP浏览特定商品20min后还没下单,自动推送商品评测信息的消息
3.调用第三方接口后,过30s去查询接口调用状态,比如简单的掉第三方接口发短信,掉完运营商不会立马告知你短信发送成功还是失败、所以设置30s后去主动查询下短信状态,然后更新此条短信状态(成功、失败、待回执)等
我们本篇文章,模拟场景 设置30s的延迟队列,消息在队列中延迟30s后,再去处理消息,实现业务逻辑
2.延迟队列实现方式
Rabbitmq本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
- 通过Rabbitmq本身队列的特性TTL来实现,利用队列消息的存活时间来实现,设置队列的TTL消息存活周期 和死信交换机,延迟队列需要消息的存活时间TTL(Time To Live) 来丢弃消息,并使用Rabbitmq的死信交换机(Exchange)来负责消息的转发
- 在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT18.0及以上
3.TTL+Exchange实现延迟队列
这种方式实现的原理就是利用队列消息存活时间TTL(Time To Live) ,超时就会丢弃本条消息,然后丢弃到死信交换机,死信交换机根据路由信息RoutingKey去负责路由转发,转发到相应的消费者来实现 延迟后业务逻辑处理
!!!注意我们设置TTL的队列是用做存储消息30s的,他并没有消费者
真正的消费者是绑定在死信交换机上面的,通过死信交换机设置的RoutingKey来路由到目标队列
实现原理:
3.1 初始化死信交换机
构造一个DeadLetterExchange 名字exchange_dead 和一个 target队列 名字 queue_delay_target,target队列就是接收死信消息,由他的消费者来处理延迟30s后的相关业务
交换机枚举类 ExchangeTypeEnum
package delay;
public enum ExchangeTypeEnum {
DIRECT("exchange-direct-name", "direct"),
FANOUT("exchange-fanout-name", "fanout"),
TOPIC("exchange-topic-name", "topic"),
HEADER("exchange-header-name", "headers"),
UNKNOWN("unknown-exchange-name", "direct");
/**
* 交换机名字
*/
private String name;
/**
* 交换机类型
*/
private String type;
ExchangeTypeEnum(String name, String type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public static ExchangeTypeEnum getEnum(String type) {
ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();
for (ExchangeTypeEnum exchange : exchangeArrays) {
if (exchange.getName().equals(type)) {
return exchange;
}
}
return ExchangeTypeEnum.UNKNOWN;
}
}
死信交换机、死信队列定义
package delay;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
public class DelayDeadExchange {
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "exchange_dead";
/**
* 目标队列,就是真正有消费者监听,要处理业务的队列
*/
public static final String QUEUE_TARGET = "queue_delay_target";
/**
* 设置 rk=# 表示任意RK的消息过来,都可以路由到 dead_msg_quque这个队列
*/
public static final String RK_QUEUE_TARGET = "rk.queue_delay_target";
/**
* 声明死信队列信息
* <p>
* 死信队列需要将 死亡的消息路由到 目标队列,从而使目标队列的消费者进行消费处理相关业务
*
* @throws Exception
*/
public static void deadInit() throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
/*声明 直连交换机 交换机 String exchange,
* 参数明细
* 1、交换机名称
* 2、交换机类型,topic
*/
channel.exchangeDeclare(DEAD_EXCHANGE, ExchangeTypeEnum.DIRECT.getType());
channel.queueDeclare(QUEUE_TARGET, true, false, false, null);
/*交换机和队列绑定String queue, String exchange, String routingKey
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key rk.queue_delay_target
*/
channel.queueBind(QUEUE_TARGET, DEAD_EXCHANGE, RK_QUEUE_TARGET);
//关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
deadInit();
}
}
运行 deadInit(),初始化死信交换机及死信队列,查看下界面
3.2 生产者
生产者定义一个TTL队列 queue_delay_ttl 设置队列消息生命周期TTL:30s,路由RoutingKey :rk.ttl_queue_test 就是为了存放消息,让他超时死亡后,经过死信交换机路由转发
!!!注意该队列没有消费者,只有ttl参数及绑定的死信队列
package delay;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import static delay.DelayDeadExchange.*;
public class DelayQueueProducer {
/**
* 延时队列名字
*/
public final static String QUEUE_TTL = "queue_delay_ttl";
/**
* 设置延时队列的RK
*/
public final static String RK_QUEUE_TTL = "rk.ttl_queue_test";
/**
* 生产 Direct直连 交换机的MQ消息
*/
public static void produce() throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
/*声明 直连交换机 交换机 String exchange,
* 参数明细
* 1、交换机名称
* 2、交换机类型,direct
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/* 声明(创建)队列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue - 队列名
* durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
* exclusie - 是否排外的,仅限于当前队列使用
* autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
* arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等
*/
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// !!!!! 注意这里绑定的 RoutingKey 是 死信队列根据RK要路由到目标队列的RoutingKey,所以要用目标队列的RoutingKey
arguments.put("x-dead-letter-routing-key", RK_QUEUE_TARGET);
arguments.put("x-message-ttl", 30000);
channel.queueDeclare(QUEUE_TTL, true, false, false, arguments);
/*交换机和队列绑定String queue, String exchange, String routingKey
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key rk.queue_delay_ttl
*/
channel.queueBind(QUEUE_TTL, ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TTL);
/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
* exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发
* queuename - 队列信息
* props - 参数信息
* message 消息体 byte[]类型
*/
// 消息内容
String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TTL, null, message.getBytes());
System.out.println(" **** Producer Sent Message: [" + message + "]");
//关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
// //生产消息
produce();
}
}
运行produce() 生产1条消息,查看界面
TTL的队列中消息由1条,target中没有消息
3.3 消费者
30s后,生产者生产的消息,TTL生命周期到了,就死亡了,从DeadLetterExchange 路由到了Traget队列
看下Target队列,target队列中从 0 变成 了1条消息
消费者我们循环10次,让他等待消息,消费此消息
package delay;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import static delay.DelayDeadExchange.*;
public class DelayTargetConsumer {
public static void main(String[] argv) throws Exception {
Connection connection = null;
Channel channel = null;
try {
connection = MqConnectUtil.getConnectionDefault();
channel = connection.createChannel();
/*声明交换机 String exchange
* 参数明细
* 1、交换机名称
* 2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/*声明队列
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_TARGET, true, false, false, null);
//交换机和队列绑定String queue, String exchange, String routingKey
/**
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key
*/
channel.queueBind(QUEUE_TARGET, DEAD_EXCHANGE, RK_QUEUE_TARGET);
System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
/* 消息确认机制
* autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费
* autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态
* 并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈
* !!!!!! 注意这里是 false,手动确认
*/
channel.basicConsume(QUEUE_TARGET, false, consumer);
int count = 0;
while (count < 10) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" count:" + count + " **** Consumer->1 Received '" + message + "'");
doSomeThing(message);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
connection.close();
}
}
/**
* 模拟处理复杂逻辑:休眠100ms
*
* @param message
* @throws Exception
*/
public static void doSomeThing(String message) throws Exception {
//遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑
Thread.sleep(100);
}
}
运行consumer,接收到了此消息,队列清空
消费完毕,消息队列清零
4.安装插件实现延迟队列
RabbitMQ提供了插件,可以实现延迟队列,我们现在采用插件的方式来实现一下延迟队列
4.1 插件下载
去RabbitMQ的官网下载插件,插件地址:https://www.rabbitmq.com/community-plugins.html
找到 GitHub: rabbitmq/rabbitmq-delayed-message-exchange 的地方,下载
4.2 插件安装
把下载的 rabbitmq_delayed_message_exchange-3.8.0.ez 文件 复制到Rabbitmq安装路径 下面的 plugins/目录下
切换到Rabbitmq安装目录 sbin下,执行命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
打开界面Rabbitmq,新建交换机 plugin_delay_exchange,可以看到多了一种交换机 x-delay-message类型
多了一种交换机类型x-delay-message类型
4.3 延迟交换机插件使用
生产消息
package delay;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import java.util.HashMap;
import java.util.Map;
public class PluginsDelayExchange {
/**
* 延迟队列
*/
public static final String PLUGIN_DELAY_QUEUE = "plugin_delay_queue";
/**
* 延迟队列RoutingKey
*/
public static final String RK_PLUGIN_DELAY_QUEUE = "rk.plugin_delay_queue";
/**
* 延迟队列交换机
*/
public static final String PLUGIN_DELAY_EXCHANGE = "plugin_delay_exchange";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare(PLUGIN_DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
channel.queueDeclare(PLUGIN_DELAY_QUEUE, true, false, false, null);
channel.queueBind(PLUGIN_DELAY_QUEUE, PLUGIN_DELAY_EXCHANGE, RK_PLUGIN_DELAY_QUEUE);
Map<String, Object> headers = new HashMap<>();
//延迟10s后发送
headers.put("x-delay", 10000);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
//延迟10s后 发送消息
channel.basicPublish(PLUGIN_DELAY_EXCHANGE, RK_PLUGIN_DELAY_QUEUE, props, "该消息将在10s后发送到队列".getBytes());
channel.close();
connection.close();
}
}
执行 produce,生产1条消息
4.4 查看结果
看下队列中的消息
所以说,插件的实现方式和死信队列的实现方式完全不同,一个是延迟发送、一个是延迟消费
- 插件看似立马发送,Produce进程已经结束了,过了10s后,队列才真正的有消息
- 死信队列TTL方式是立马发送,然后消息在队列中存活10sTTL时间,后被路由转发,由消费者消费
至此 死信队列讲完了
下一章 我们讲一下 RabbitMQ系列(十二)RabbitMQ进阶-消息确认机制之事务机制