7.rabbitmq死信和死信队列

2023-05-16

rabbitmq死信和死信队列

概述

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.rejectbasic.nack)并且 requeue=false.

死信的流程图

在这里插入图片描述

TTL过期

什么是TTL,TTL是Rabbitmq中的一个消息或者队列的属性,表明一条消息或者队列中的所有消息的最大存活时间,单位是毫秒。

如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

  • 针对每一条消息设置TTL

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    
  • 队列设置TTL

    Map<String,Object> params = new HashMap<>();
    params.put("x-message-ttl",1000);
    

区别:

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者 之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃。

生产者代码

/**
 * 死信 TTl过期 生产者
 */
public class TtlProducer {
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        channel.exchangeDeclare(ExchangeNames.NORMAL, BuiltinExchangeType.DIRECT);
        //设置消息的TTl时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(ExchangeNames.NORMAL,"normal-key",properties,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者1代码,启动之后关闭该消费者,使其接收不到消息

/**
 * 死信 TTL 消费者1
 */
public class TtlCustomer1 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        //声明死信和普通交换机
        channel.exchangeDeclare(ExchangeNames.NORMAL, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeNames.DEAD, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,ExchangeNames.DEAD,"dead-key");

        //设置参数
        Map<String,Object> params = new HashMap<>();
        //正常队列设置死信交换机,key是固定的
        params.put("x-dead-letter-exchange",ExchangeNames.DEAD);
        //正常队列设置死信的 routing-key, key是固定的
        params.put("x-dead-letter-routing-key","dead-key");
        params.put("x-message-ttl",1000);

        //声明普通的队列
        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue,false,false,false,params);
        //绑定队列和交换机
        channel.queueBind(normalQueue,ExchangeNames.NORMAL,"normal-key");
      
        System.out.println("TtlCustomer1 等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String receive = new String(message.getBody());
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("接收绑定键:"+message.getEnvelope().getRoutingKey()+",消息 :"+receive);
        };
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});
    }
}

此时,生产者未发送消息,消费者也没有接收到消息

在这里插入图片描述

生产者发送10条消息,此时正常队列有10条消息没有被消费

在这里插入图片描述

时间过去10秒,正常队列里面的消息没有被消费,消息进入了死信队列

在这里插入图片描述

消费者2代码

/**
 * 死信 TTL 消费者2
 */
public class TtlCustomer2 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        //声明死信和普通交换机
        channel.exchangeDeclare(ExchangeNames.DEAD, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        //绑定队列和交换机
        channel.queueBind(deadQueue,ExchangeNames.DEAD,"dead-key");

        System.out.println("TtlCustomer2 等待接收死信队列消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String receive = new String(message.getBody());
            System.out.println("接收死信队列消息:"+message.getEnvelope().getRoutingKey()+",消息 :"+receive);
        };
        channel.basicConsume(deadQueue,true,deliverCallback,consumerTag -> {});
    }
}

启动消费者2代码,发现死信队列中的消息被消费者2消费

队列达到最大长度

我们可以给一个队列设置最大的长度,当队列中的消息达到了最大长度之后,后面的消息就转入到死信队列中。

Map<String,Object> params = new HashMap<>();
params.put("x-max-length",6);

生产者代码

/**
 * 死信 队列最大长度 生产者
 */
public class MaxLengthProducer {
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        channel.exchangeDeclare(ExchangeNames.NORMAL, BuiltinExchangeType.DIRECT);
        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(ExchangeNames.NORMAL,"normal-key",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者1代码

/**
 *
 * 死信 最大队列长度 消费者
 */
public class MaxLengthConsumer1 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        //声明两个队列,一个普通队列,一个死信队列
        channel.exchangeDeclare(ExchangeNames.DEAD, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeNames.NORMAL, BuiltinExchangeType.DIRECT);

        Map<String,Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",ExchangeNames.DEAD);
        params.put("x-dead-letter-routing-key","dead-key");
        params.put("x-max-length",6);

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue,false,false,false,params);
        channel.queueBind(normalQueue,ExchangeNames.NORMAL,"normal-key");

        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,ExchangeNames.DEAD,"dead-key");

        System.out.println("MaxLengthConsumer1 等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String receive = new String(message.getBody());
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("接收绑定键:"+message.getEnvelope().getRoutingKey()+",消息 :"+receive);
        };
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});

    }
}

启动消费者1,查看消费未发送时的状态

在这里插入图片描述

关闭消费者1,启动生产者,发送消息

在这里插入图片描述

消费者2代码

/**
 * 死信 最大队列长度 消费者
 */
public class MaxLengthConsumer2 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        //声明死信交换机
        channel.exchangeDeclare(ExchangeNames.DEAD, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        //绑定队列和交换机
        channel.queueBind(deadQueue,ExchangeNames.DEAD,"dead-key");


        System.out.println("MaxLengthConsumer2 等待接收死信队列消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String receive = new String(message.getBody());
            System.out.println("接收死信队列消息:"+message.getEnvelope().getRoutingKey()+",消息 :"+receive);
        };
        channel.basicConsume(deadQueue,true,deliverCallback,consumerTag -> {});
    }
}

启动消费者2,消费掉死信队列中的消息之后,如下图所示

在这里插入图片描述

消息被拒

普通队列拒绝接收消息之后,将消息转入死信队列,拒绝接收消息的方法如下所示

  • 第二个参数requeue:设置为false,代表拒接重新入队,该队列如果配置了死信队列那么将进入到死信队列中
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);

生产者代码

/**
 * 消息被拒 生产者
 */
public class RejectProducer {
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        channel.exchangeDeclare(ExchangeNames.NORMAL, BuiltinExchangeType.DIRECT);
        for (int i = 1; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(ExchangeNames.NORMAL,"normal-key",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者1代码

/**
 *
 * 死信 消息被拒 消费者1
 */
public class RejectConsumer1 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        //声明两个队列,一个普通队列,一个死信队列
        channel.exchangeDeclare(ExchangeNames.DEAD, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeNames.NORMAL, BuiltinExchangeType.DIRECT);

        Map<String,Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange",ExchangeNames.DEAD);
        params.put("x-dead-letter-routing-key","dead-key");

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue,false,false,false,params);
        channel.queueBind(normalQueue,ExchangeNames.NORMAL,"normal-key");

        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,ExchangeNames.DEAD,"dead-key");

        System.out.println("RejectConsumer1 等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String receive = new String(message.getBody());
            if (receive.equals("info5")){
                System.out.println("RejectConsumer1接收到消息:"+receive+",但是拒接接收了");
                //requeue 设置为false,代表拒接重新入队 该队列如果配置了死信队列那么将进入到死信队列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                System.out.println("RejectConsumer1接收到消息:"+receive);
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});

    }
}

启动消费者1,队列信息如下所示

在这里插入图片描述

启动生产者,发送消息,info5被拒绝接收进入死信

在这里插入图片描述

消费者2代码

/**
 *
 * 死信 消息被拒 死信 消费者
 */
public class RejectConsumer2 {

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitUtil.getChannel();
        //声明死信交换机
        channel.exchangeDeclare(ExchangeNames.DEAD, BuiltinExchangeType.DIRECT);

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        //绑定队列和交换机
        channel.queueBind(deadQueue,ExchangeNames.DEAD,"dead-key");


        System.out.println("RejectConsumer2 等待接收死信队列消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String receive = new String(message.getBody());
            System.out.println("接收死信队列消息:"+message.getEnvelope().getRoutingKey()+",消息 :"+receive);
        };
        channel.basicConsume(deadQueue,true,deliverCallback,consumerTag -> {});
    }
}

启动消费者2消费死信队列中的消息

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

7.rabbitmq死信和死信队列 的相关文章

随机推荐

  • 排序算法:冒泡排序和选择排序的思路,区别与优缺点。

    一 xff0c 冒泡排序 xff1a 冒泡排序的定义就不提了 xff0c 总结起来就一句话 xff08 划重点 xff09 xff1a xff0c 从左到右 xff0c 数组中相邻的两个元素进行比较 xff0c 将较大的放到后面 算法思路
  • ROS创建功能包并自定义消息

    ROS有时需要自定义消息 xff0c 本文叙述如何通过创建功能包并自定义消息 创建ROS工作空间具体实现 xff1a https blog csdn net qq 34911636 article details 100103448 创建一
  • 卡尔曼滤波详细推导

    卡尔曼滤波 xff08 Kalman filtering xff09 是一种利用线性系统状态方程 xff0c 通过系统输入输出观测数据 xff0c 对系统状态进行最优估计的算法 xff0c 由于观测数据中包括系统中的噪声和干扰的影响 xff
  • ROS tf工具与消息查看命令

    TF工具坐标系统是一个基础理论 xff0c 但是涉及到多个空间的变换 xff0c 不容易进行想象所以TF工具给开发者调试提供很多方便 1 tf monitor xff1a 将当前的坐标系转换关系打印到终端控制台 rosrun tf tf m
  • melodic 打开gazebo出现[Err] [REST.cc:205] Error in REST request错误解决方法

    ROS melodic版本下打开gazebo出现 Err REST cc 205 Error in REST request错误解决方法 输入以下命令打开文件 sudo gedit ignition fuel config yaml 然后将
  • 技术资源汇总(一)

    1 Ubuntu技术论坛 xff1a https askubuntu com 2 树莓派资源 https www yahboom com study raspberry3B 密码 xff1a cf0p 汇总资料提取码 xff1a hdy7
  • docker常用命令

    1 配置docker阿里云镜像 1 打开daemon json文件 xff08 若没有此文件 xff0c 则创建 etc docker daemon json xff09 xff1a vi etc docker daemon json 2
  • 网络调试助手UDP广播问题

    用直接广播地址 xff08 192 168 xxx 255 端口 xff09 可以进行广播 xff1b 用受限广播地址 xff08 255 255 255 255 端口 xff09 显示没有指定有效的远程主机端口 xff0c 搞了好久发现是
  • “平衡小车之家”家的STM32F103最小系统源代码分享

    在网上寻找了好久 xff0c 因为他家的开发板自带有mpu6050模块 故想测试其精准度以及z轴漂移程度 发现也有很大的漂移 代码如下 main c部分 xff1a span class token macro property span
  • 使用PMW3901和VL53L1X 实现室内定点悬停

    使用PMW3901和VL53L1X 实现室内定点悬停 使用PMW3901 光流传感器进行水平方向定位Pixhawk连接PMW3901传感器PX4源代码加入PMW3901驱动后重新编译QGroundControl中的配置 使用气压计和VL53
  • 使用 QGroundControl 地面站更新 PixHawk飞控的Bootloader

    安装最新版本的PX4固件 启动QGroundControl并且使用USB连接到Pixhawk飞控 选择 Q icon gt Vehicle Setup gt Firmware sidebar 打开固件设置 安装最新版本的PX4固件 更新Bo
  • 自制DIY 机器狗 完全教程 - MIT猎豹Cheetah

    自制DIY 机器狗 完全教程 MIT猎豹Cheetah 背景结构设计模块化关节电机性能考虑关节结构 四足平台设计腿部设计身体设计脚部设计 硬件设计关节驱动器通信总线板供电系统 控制系统人工智能 背景 3年前 xff0c MIT开源了世界上跑
  • centos安装wxWidgets,erlang,RabbitMq

    centos安装wxWidgets erlang RabbitMq 默认已经安装了java环境 而安装RabbitMq需要安装erlang xff0c 安装erlang又需要安装wxWidgets 安装wxWidgets 更新系统 yum
  • 2.rabbitmq概述和helloworld

    rabbitmq概述 rabbitmq中的几个概念 BROKER 接收和分发消息的应用 xff0c RabbitMQ Server 就是 Message Broker Virtual Host 出于多租户和安全因素设计的 xff0c 把 A
  • 3.rabbitmq轮询和不公平分发

    rabbitmq轮询和不公平分发 rabbitmq轮询分发 rabbitmq默认是使用轮询来分发消息的 测试代码如下所示 生产者代码 span class token comment 生产者 task rabbitmq 轮询演示 span
  • 4.rabbitmq消息应答

    rabbitmq消息应答 概述 消息应答就是消费者在收到消息的时候 xff0c 在它接收到消息并处理完毕之后 xff0c 告诉rabbitmq它已经处理完了 xff0c rabbitmq可以删除这个消息了 消息应答的方式 channel b
  • 5.rabbitmq持久化

    rabbitmq持久化 队列的持久化 队列的持久化需要我们在声明的时候指定其持久化 使用durable 61 true来持久化队列 span class token comment 队列的持久化 span span class token
  • 关于双控阵列的实现原理的讨论

    xfeff xfeff http bbs chinaunix net forum viewthread tid 4140392 html 对于一个支持FC SAN的双控存储阵列 xff0c 对外号称active active xff0c 实
  • 6.rabbitmq中exchange的几种形式

    rabbitmq中exchange的几种形式 RabbitMQ 消息传递模型的核心思想是 生产者生产的消息从不会直接发送到队列 实际上 xff0c 通常生产 者甚至都不知道这些消息传递传递到了哪些队列中 相反 xff0c 生产者只能将消息发
  • 7.rabbitmq死信和死信队列

    rabbitmq死信和死信队列 概述 先从概念解释上搞清楚这个定义 xff0c 死信 xff0c 顾名思义就是无法被消费的消息 xff0c 字面意思可以这样理 解 xff0c 一般来说 xff0c producer 将消息投递到 broke