在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息。
一、什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列。
消息成为死信的三种情况:
1)队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列;
2)消费者拒绝消费消息,并且不把消息重新放入原目标队列;
3)原队列存在消息过期设置,消息到达超时时间未被消费;
二、时间过期产生死信
生产者的主要配置 ,如下代码:
<!--
死信队列:
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3. 正常队列绑定死信交换机设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queuename="test_queue_dlx"id="test_queue_dlx">
<!-- 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!-- x-dead-letter-exchange:死信交换机名称-->
<entrykey="x-dead-letter-exchange"value="exchange_dlx"/>
<!-- x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entrykey="x-dead-letter-routing-key"value="dlx.hehe"></entry>
<!-- 设置队列的过期时间 ttl-->
<entrykey="x-message-ttl"value="10000"value-type="java.lang.Integer"/>
<!-- 设置队列的长度限制 max-length -->
<entrykey="x-max-length"value="10"value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--正常交换机-->
<rabbit:topic-exchangename="test_exchange_dlx">
<rabbit:bindings>
<rabbit:bindingpattern="test.dlx.#"queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queuename="queue_dlx"id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchangename="exchange_dlx">
<rabbit:bindings>
<rabbit:bindingpattern="dlx.#"queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
在测试类中,添加如下方法,进行测试:
/**
* 发送测试死信消息:
* 1. 过期时间
*/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会变成死信消息吗?");
}
运行测试,查看管理台界面:
三、队列长度限制产生死信
修改测试类,添加测试方法:
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
*/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
//rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会变成死信消息吗?");
//2. 测试长度限制后,消息死信
for (int i = 0; i <11; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会变成死信消息吗?");
}
}
运行测试方法进行测试:
四、消息拒收产生死信
在消费者工程创建 DlxListener:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class DlxListener implements ChannelAwareMessageListener{
@Override
public void onMessage(Message message, Channel channel)throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}
配置文件中消费者的配置:
<rabbit:listener-containerconnection-factory="connectionFactory"acknowledge="manual">
<!--定义监听器,监听正常队列-->
<rabbit:listenerref="dlxListener"queue-names="test_queue_dlx"></rabbit:listener>
</rabbit:listener-container>
生产者测试代码:
/** * 发送测试死信消息
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
**/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
//rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会变成死信消息吗?");
//2. 测试长度限制后,消息死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会变成死信消息吗?");
// }
//3. 测试消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx",
"test.dlx.haha",
"我是一条消息, 我会变成死信消息吗?");
}
发送消息,运行程序,查看后台管理界面:
五、死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种:
1.丢弃,如果不是很重要,可以选择丢弃
2.记录死信入库,然后做后续的业务分析或处理
3.通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)