RabbitMQ系列(十)RabbitMQ进阶-Queue队列参数详解-死信交换机

2023-10-26

RabbitMQ进阶-Queue队列参数详解-死信交换机

1. Dead Letter Exchange 介绍

我们先新建一个 死信队列交换机 exchange-dead
在这里插入图片描述
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被拒绝(basic.reject / basic.nack),使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  2. 消息TTL过期,队列设置了x-message-ttl消息的生存周期,过期的消息会被丢弃到死信交换机中
  3. 队列达到最大长度,然后队列主动抛弃、丢弃的消息

以上三种情况的消息,将成为“死信”,“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

2. 死信消息方式

先定义交换机类型,然后我们分别演示一下几种情况的死信队列
声明交换机信息

package dead;

public enum ExchangeTypeEnum {

    DIRECT("exchange-direct-name", "direct"),
    FANOUT("exchange-fanout-name", "fanout"),
    TOPIC("exchange-topic-name", "topic"),
    HEADER("exchange-header-name", "header"),
    UNKNOWN("unknown-exchange-name", "direct");

    /**
     * 交换机名字
     */
    private String name;
    /**
     * 交换机类型
     */
    private String type;

    ExchangeTypeEnum(String name, String type) {
        this.name = name;
        this.type = type;
    }

    public String getName() {
        return name;
    }

    public String getType() {
        return type;
    }

    public static ExchangeTypeEnum getEnum(String type) {
        ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();
        for (ExchangeTypeEnum exchange : exchangeArrays) {
            if (exchange.getName().equals(type)) {
                return exchange;
            }
        }
        return ExchangeTypeEnum.UNKNOWN;
    }

}

2.1 消息被拒绝

我们生产一个消息,然后消费者Reject处理,然后看下消息处理方式
首先我们先理解一下 消息被拒绝丢弃

2.1.1 channel.basicNack 用法

channel.basicNack(deliveryTag, multiple, requeue);

  • deliveryTag:long - 消息投递的唯一标识,作用域为当前channel
  • multiple:boolean -是否启用批量确认机制
  • requeue:boolean - 消息处理失败是重新放回队列还是直接丢弃

下面我们看下用法

  1. channel.basicNack(8, true, true);
    表示deliveryTag=8之前未确认的多个消息都处理失败且将这些消息重新放回队列。
  2. channel.basicNack(8, true, false);
    表示deliveryTag=8之前未确认的多个消息都处理失败且将这些消息直接丢弃。
  3. channel.basicNack(8, false, true);
    表示deliveryTag=8的单条消息处理失败且将该消息重新放回队列。
  4. channel.basicNack(8, false, false);
    表示deliveryTag=8的单条消息处理失败且将该消息直接丢弃
2.1.2 channel.basicReject 用法

channel.basicReject(deliveryTag, requeue);
相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。

  1. channel.basicReject(8, true);
    表示deliveryTag=8的消息处理失败且将该消息重新放回队列
  2. channel.basicReject(8, false);
    表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
2.1.3 初始化死信队列

新建一个死信队列常量类

package dead;

public class DeadConst {

    /**
     * 死信交换机
     */
    public static final String DEAD_EXCHANGE = "exchange_dead";
    /**
     * 新建一个队列,用于接受通过 死信交换机路由过来的 死信消息
     */
    public static final String DEAD_MSG_QUEUE = "dead_msg_queue";

    /**
     * 设置 rk=# 表示任意RK的消息过来,都可以路由到 dead_msg_quque这个队列
     */
    public static final String RK_DERAD_MSG_QUEUE = "#";

}

执行 deadInit()方法

package dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

import static dead.DeadConst.*;


public class DeadReject {
    /**
     * 队列名字
     */
    public final static String REJECT_QUEUE_NAME = "reject_queue_test";
    /**
     * routingkey
     */
    public final static String RK_REJECT_QUEUE_NAME = "rk.reject_queue_test";

    /**
     * 声明死信队列信息
     *
     * @throws Exception
     */
    public static void deadInit() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,topic
         */
        channel.exchangeDeclare(DEAD_EXCHANGE, ExchangeTypeEnum.TOPIC.getType());
        channel.queueDeclare(DEAD_MSG_QUEUE, true, false, false, null);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.dead_msg_queue
         */
        channel.queueBind(DEAD_MSG_QUEUE, DEAD_EXCHANGE, RK_DERAD_MSG_QUEUE);

        //关闭通道和连接
        channel.close();
        connection.close();
    }


    /**
     * 生产 Direct直连 交换机的MQ消息
     */
    public static void produce() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,direct
         */
        channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());

        /* 声明(创建)队列  queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete,  Map<String, Object> arguments)
         * queue - 队列名
         * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
         * exclusie - 是否排外的,仅限于当前队列使用
         * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等
         */
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//        arguments.put("x-dead-letter-routing-key", RK_REJECT_QUEUE_NAME);
        arguments.put("x-dead-letter-routing-key", "xxx");
        channel.queueDeclare(REJECT_QUEUE_NAME, true, false, false, arguments);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.subscribe_queue_direct
         */
        channel.queueBind(REJECT_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), RK_REJECT_QUEUE_NAME);


        /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发
         * queuename - 队列信息
         * props - 参数信息
         * message 消息体 byte[]类型
         */
        // 消息内容
        String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
        channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_REJECT_QUEUE_NAME, null, message.getBytes());
        System.out.println(" **** Producer  Sent Message: [" + message + "]");


        //关闭通道和连接
        channel.close();
        connection.close();
    }


    public static void consumer() throws Exception {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = MqConnectUtil.getConnectionDefault();
            channel = connection.createChannel();

            /*声明交换机 String exchange
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());


            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key
             */
            channel.queueBind(REJECT_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), RK_REJECT_QUEUE_NAME);

            System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");


            QueueingConsumer consumer = new QueueingConsumer(channel);

            /* 消息确认机制
             * autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费
             * autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态
             *          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈
             *          !!!!!! 注意这里是 false,手动确认
             */
            channel.basicConsume(REJECT_QUEUE_NAME, false, consumer);

            int count = 0;
            while (count < 10) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");
                doSomeThing(message);
                //返回tag的当前标签,不再重新路由,直接丢弃
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                count++;
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }

    }


    /**
     * 模拟处理复杂逻辑:休眠100ms
     *
     * @param message
     * @throws Exception
     */
    public static void doSomeThing(String message) throws Exception {
        //遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑
        Thread.sleep(100);
    }

    public static void main(String[] args) throws Exception {
        //初始化死信队列
        deadInit();
//        //生产消息
//        produce();
//        //消费者
//        consumer();
    }
}

我们看下 死信交换机exchange_dead和 接受死亡消息的队列
在这里插入图片描述

2.1.4 生产者生产消息
    public static void main(String[] args) throws Exception {
        //初始化死信队列
//        deadInit();
//        //生产消息
        produce();
//        //消费者
//        consumer();
    }

执行 produce() 生产1条消息
在这里插入图片描述

2.1.5 消费者Reject消息丢弃消息

执行consumer()

    public static void main(String[] args) throws Exception {
        //初始化死信队列
//        deadInit();
//        //生产消息
//        produce();
//        //消费者
        consumer();
    }

!!! 注意 consumer的消费者 通过
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
来拒绝消息,第二个参数表示单个tag的丢弃、第三个参数表示不再重新路由到队列
看一下原始队列
在这里插入图片描述
我们看一下死亡信息存放的队列,发现有1条消息,正式我们刚才的consumer拒绝的那条消息
在这里插入图片描述
至此~ 通过消费者Reject,到死信交换机,在通过绑定的RoutingKey:# 路由的过程已经完毕
清空、删除之前的队列信息,继续

2.1.6 消费者Reject消息,重新路由、无限循环

下面我们试一下,多次路由的情况,构造一种场景
队列A发消息->A的消费者拒绝->通过Reject的第三个参数 requeue=true 重新路由->再到当前队列A
取出消息,看下消息的参数到底有何改变
删除之前的死信交换机和之前的队列信息

package dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;

import java.time.LocalDate;
import java.time.LocalTime;

public class QueueMessageRouteTest {

    /**
     * 队列名字
     */
    public final static String REJECT_QUEUE_NAME = "reject_queue_test";
    /**
     * routingkey
     */
    public final static String RK_REJECT_QUEUE_NAME = "rk.reject_queue_test";


    /**
     * 生产 Direct直连 交换机的MQ消息
     */
    public static void produce() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,direct
         */
        channel.exchangeDeclare(subscrib3.ExchangeTypeEnum.DIRECT.getName(), subscrib3.ExchangeTypeEnum.DIRECT.getType());

        /* 声明(创建)队列  queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete,  Map<String, Object> arguments)
         * queue - 队列名
         * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
         * exclusie - 是否排外的,仅限于当前队列使用
         * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等
         */
        channel.queueDeclare(REJECT_QUEUE_NAME, true, false, false, null);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.subscribe_queue_direct
         */
        channel.queueBind(REJECT_QUEUE_NAME, subscrib3.ExchangeTypeEnum.DIRECT.getName(), RK_REJECT_QUEUE_NAME);


        /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发
         * queuename - 队列信息
         * props - 参数信息
         * message 消息体 byte[]类型
         */
        // 消息内容
        String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
        channel.basicPublish(subscrib3.ExchangeTypeEnum.DIRECT.getName(), RK_REJECT_QUEUE_NAME, null, message.getBytes());
        System.out.println(" **** Producer  Sent Message: [" + message + "]");


        //关闭通道和连接
        channel.close();
        connection.close();
    }


    public static void consumer() throws Exception {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = MqConnectUtil.getConnectionDefault();
            channel = connection.createChannel();

            /*声明交换机 String exchange
             * 参数明细
             * 1、交换机名称
             * 2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(subscrib3.ExchangeTypeEnum.DIRECT.getName(), subscrib3.ExchangeTypeEnum.DIRECT.getType());


            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             * 参数明细
             * 1、队列名称
             * 2、交换机名称
             * 3、路由key
             */
            channel.queueBind(REJECT_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), RK_REJECT_QUEUE_NAME);

            System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");


            QueueingConsumer consumer = new QueueingConsumer(channel);

            /* 消息确认机制
             * autoAck true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费
             * autoAck false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态
             *          并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈
             *          !!!!!! 注意这里是 false,手动确认
             */
            channel.basicConsume(REJECT_QUEUE_NAME, false, consumer);

            int count = 0;
            while (count < 10) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" count:" + count + " **** Consumer->2 Received '" + message + "'");
                doSomeThing(message);
                //返回tag的当前标签,不再重新路由,直接丢弃
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                count++;
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }

    }


    /**
     * 模拟处理复杂逻辑:休眠100ms
     *
     * @param message
     * @throws Exception
     */
    public static void doSomeThing(String message) throws Exception {
        //遍历Count ,sleep , 接收一条消息后休眠 100 毫秒,模仿复杂逻辑
        Thread.sleep(100);
    }

    public static void main(String[] args) throws Exception {
//        //生产消息
//            produce();
//        //消费者
//        consumer();
    }
}

执行produce生产1条消息,然后同reject 来控制重新路由,启动下消费者
!!! 注意消费者我循环了10次,如果会重新路由,肯定会有多次消费消费的
在这里插入图片描述
而且队列中始终有这一条消息存在
在这里插入图片描述
所以为了避免出现我们无限消费的情况,对消息的幂等性进行处理,我们要对消息设置唯一ID,消费过的消息再次过来时候,拒绝消费
清空、删除之前的队列信息,继续


2.2 消息TTL过期

消息设置生命周期,超过生命周期的消息,会通过死信交换机、再次路由到死信队列

2.2.1 设置TTL消息生命周期,生产消息

初始化死信队列 deadInit(),生产消息执行produce(),我们设置TTL=30s

package dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

import static dead.DeadConst.*;


public class DeadTtlDelete {
    /**
     * 队列名字
     */
    public final static String TTL_QUEUE_NAME = "ttl_queue_test";
    /**
     * routingkey
     */
    public final static String RK_TTL_QUEUE_NAME = "rk.ttl_queue_test";

    /**
     * 声明死信队列信息
     *
     * @throws Exception
     */
    public static void deadInit() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,topic
         */
        channel.exchangeDeclare(DEAD_EXCHANGE, ExchangeTypeEnum.TOPIC.getType());
        channel.queueDeclare(DEAD_MSG_QUEUE, true, false, false, null);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.dead_msg_queue
         */
        channel.queueBind(DEAD_MSG_QUEUE, DEAD_EXCHANGE, RK_DERAD_MSG_QUEUE);

        //关闭通道和连接
        channel.close();
        connection.close();
    }


    /**
     * 生产 Direct直连 交换机的MQ消息
     */
    public static void produce() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,direct
         */
        channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());

        /* 声明(创建)队列  queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete,  Map<String, Object> arguments)
         * queue - 队列名
         * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
         * exclusie - 是否排外的,仅限于当前队列使用
         * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等
         */
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", "xxx");
        arguments.put("x-message-ttl", 30000);
        channel.queueDeclare(TTL_QUEUE_NAME, true, false, false, arguments);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.subscribe_queue_direct
         */
        channel.queueBind(TTL_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), RK_TTL_QUEUE_NAME);


        /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发
         * queuename - 队列信息
         * props - 参数信息
         * message 消息体 byte[]类型
         */
        // 消息内容
        String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
        channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_TTL_QUEUE_NAME, null, message.getBytes());
        System.out.println(" **** Producer  Sent Message: [" + message + "]");


        //关闭通道和连接
        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws Exception {
        //初始化死信队列
        deadInit();
//        //生产消息
        produce();
    }
}

执行后我们看下队列信息在这里插入图片描述

2.2.1 TTL时间过期后,消息到死信队列

30s过后,原始队列中的消息死亡消失,之前死信队列没消息,现在出现了一条消息
在这里插入图片描述
至此~ 设置TTL超时死亡的消息,路由到死信队列完毕
清空、删除之前的队列信息,继续


2.3 队列达到最大长度
2.3.1 设置队列长度,生产消息

先初始化死信队列deadInit(),然后执行produce(),设置队列长度为1,但是我们生产3条消息,看下多出的2条消息如何处理

package dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

import static dead.DeadConst.*;


public class DeadMaxLengthReject {
    /**
     * 队列名字
     */
    public final static String MAX_LENGTH_QUEUE_NAME = "max_length_queue_test";
    /**
     * routingkey
     */
    public final static String RK_MAX_LENGTH_QUEUE_NAME = "rk.max_length_queue_test";

    /**
     * 声明死信队列信息
     *
     * @throws Exception
     */
    public static void deadInit() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,topic
         */
        channel.exchangeDeclare(DEAD_EXCHANGE, ExchangeTypeEnum.TOPIC.getType());
        channel.queueDeclare(DEAD_MSG_QUEUE, true, false, false, null);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.dead_msg_queue
         */
        channel.queueBind(DEAD_MSG_QUEUE, DEAD_EXCHANGE, RK_DERAD_MSG_QUEUE);

        //关闭通道和连接
        channel.close();
        connection.close();
    }


    /**
     * 生产 Direct直连 交换机的MQ消息
     */
    public static void produce(Integer i) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,direct
         */
        channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());

        /* 声明(创建)队列  queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete,  Map<String, Object> arguments)
         * queue - 队列名
         * durable - 是否是持久化队列, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失
         * exclusie - 是否排外的,仅限于当前队列使用
         * autoDelete - 是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,可以通过界面 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * arguments - 队列携带的参数 比如 ttl-生命周期,x-dead-letter 死信队列等等
         */
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key", "xxx");
        arguments.put("x-max-length", 1);
        channel.queueDeclare(MAX_LENGTH_QUEUE_NAME, true, false, false, arguments);

        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.subscribe_queue_direct
         */
        channel.queueBind(MAX_LENGTH_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), RK_MAX_LENGTH_QUEUE_NAME);


        /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange - 交换机 ,"" 空时候指定的是 获取的virtualHost 虚拟服务器的 默认的exchang,每个virtualHost都有一个AMQP default type:direct 直接转发
         * queuename - 队列信息
         * props - 参数信息
         * message 消息体 byte[]类型
         */
        // 消息内容
        String message = "i" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
        channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_MAX_LENGTH_QUEUE_NAME, null, message.getBytes());
        System.out.println(" **** Producer  Sent Message: [" + message + "]");


        //关闭通道和连接
        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws Exception {
        //初始化死信队列
        deadInit();
        for (int i = 0; i < 3; i++) {
            //   生产消息
            produce(i);
        }

    }


}

看下生产消息后,队列中的结果
在这里插入图片描述

2.3.2 查看超出队列长度消息去向

可以看到i=0和i=1超出了队列长度,导致被死信交换机路由到死信队列中
在这里插入图片描述
至此~ 超出队列长度导致死信队列路由的完毕


下一篇 我们讲一下 RabbitMQ系列(十一)RabbitMQ进阶-Queue队列详解-延时队列

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ系列(十)RabbitMQ进阶-Queue队列参数详解-死信交换机 的相关文章

随机推荐

  • C++进阶必读书籍

    结合一些我的学习经历 希望对于想学C 的人有些帮助 大家有什么好想法望提出 我老师最初是从C语言教起的 用的是潭浩强的 lt
  • ajax day3

    3 将普通对象转为查询参数字符串形式 创建URLSearchParams参数 再用toString方法转为字符串 4 xhr对象 请求参数 body参数 5 promise promise对象一旦被兑现或拒绝 就是已敲定了 状态无法再被改变
  • Python模块之操作数据库MySQL篇

    目录 一 安装PyMySQL模块 二 操作数据库 1 连接数据库 2 执行sql语句 execute和executemany 3 创建数据表 三 操作MySQL数据表 1 新增数据 2 查询数据 3 修改数据 4 删除数据 5 踩到的坑 一
  • python批量读取Excel文件

    将同一个文件夹下的xlsx文件读取 import os import pandas as pd path r path of file for i in os listdir path df pd read excel os path jo
  • [读论文]CAAD-2018 Targeted Attack方向季军技术报告

    这次分享的是CAAD 2018比赛中Northwest Security团队的技术报告 该团队在此次比赛中取得了了targeted Attack 方向第三名 non targeted Attack方向第四名的成绩 题目 Leverage O
  • 构建 fluentd 镜像与部署应用

    本文将具体介绍如何在基础镜像 ubuntu 20 04 上搭建 fluentd 镜像 并且实现监控指定目录的日志文件 构建镜像 首先 从 docker hub 中挑选一个合适的基础镜像 例如 ubuntu 20 04 docker pull
  • 区块链+物联网=?

    链客 专为开发者而生 有问必答 此文章来自区块链技术社区 未经允许拒绝转载 区块链与物联网 IoT 的交叉应用已成为最有前途的区块链用例之一 在过去的几个月里 IoTeX一直与我们的战略合作伙伴合作 并进行了独立的研究 为了能够在短期内采用
  • 新闻

    4月 中国科技产业智库甲子光年发布 AIGC应用与实践研究展望报告 及AIGC产业图谱 面向AIGC技术创新者 产业参与者 资本机构和政府等各方展现AIGC产业的整体生态环境和行业发展 华院数智人凭借其在生成式AI技术 人机交互能力和市场应
  • esp32+vscode环境搭建速记

    esp32idf vscode环境搭建速记 建议按照入下步骤进行 在vscode插件里安装esp32idf 或者用在线的下载器安装会出现一些莫名奇妙的问题 第一步 安装esp32idf 官方网址 https dl espressif cn
  • gridlayout java_Swing-布局管理器之GridLayout(网格布局)-入门

    网格布局特点 l 使容器中的各组件呈M行 N列的网格状分布 l 网格每列宽度相同 等于容器的宽度除以网格的列数 l 网格每行高度相同 等于容器的高度除以网格的行数 l 各组件的排列方式为 从上到下 从左到右 l 组件放入容器的次序决定了它在
  • idea中thymeleaf语法不提示的所有原因

    首先pom xml里面要导入thymeleaf的依赖 然后在html中加入 xmlns th http www thymeleaf org 最后点击file gt settings 查看插件是否使用 未使用点击打勾重启
  • AMS1117典型电路

    AMS1117 3 3V 5V 封装 常见应用连接 1 输入旁路电容Input Bypass Capacitor A 10uF tantalum on the input is a suitable input bypassing fora
  • 解决 SyntaxError:Unexpected end of JSON input 或 Unexpected token u in JSON at position 0 问题

    1 报错原因 JSON 接收的数据不完整 或者数据格式不符合要求 如 undefined 2 JSON 数据格式要求 1 JSON文件都是被包裹在一个大括号中 通过key value的方式来表达数据 2 JSON的Key必须包裹在一个双引号
  • Python 魔法方法(三) __getattr__,__setattr__, __delattr__

    1 getattr 当我们访问一个不存在的属性的时候 会抛出异常 提示我们不存在这个属性 而这个异常就是 getattr 方法抛出的 其原因在于他是访问一个不存在的属性的最后落脚点 作为异常抛出的地方提示出错再适合不过了 看例子 我们找一个
  • 调试最长的一帧(第27天)

    对于几个多线程渲染中的成员变量 继续抄一抄 Block阻塞器 BlockCount 计数器类 它与阻塞器类的使用方法基本相同 block 阻塞线程 release 释放线程 不过除此之外 BlockCount的构造函数还可以设置一个阻塞计数
  • Point Cloud Library学习之ICP迭代最近点匹配法NDT2D正态分布转换法

    参考来源 https pointclouds org documentation classpcl 1 1 registration html ab1d64f86162b2df716ead8d978579c11 http epsilonjo
  • ctab提取dna流程图_核酸提取、纯化与常见问题解答

    写在前面 核酸提取包括DNA提取 RNA提取 质粒提取 核酸是遗传信息的携带者 是基因表达的物质基础 是分子生物学研究的主要对象 无论是进行核酸的结构还是功能研究 首先都需要对核酸进行提取和纯化 核酸是生命的最基本物质之一 可分为DNA和R
  • Kotlin协程的简单用法(GlobalScope、lifecycleScope、viewModelScope)

    协程 Coroutine 协程就像非常轻量级的线程 线程是由系统调度的 线程切换或线程阻塞的开销都比较大 而协程依赖于线程 但是协程挂起时不需要阻塞线程 协程是由开发者控制的 所以协程也像用户态的线程 非常轻量级 一个线程中可以创建任意个协
  • 目前开源数据集整理

    Attention 我的Dr Sure项目正式上线了 主旨在分享学习Tensorflow以及DeepLearning中的一些想法 期间随时更新我的论文心得以及想法 Github地址 https github com wangqingbaid
  • RabbitMQ系列(十)RabbitMQ进阶-Queue队列参数详解-死信交换机

    RabbitMQ进阶 Queue队列参数详解 死信交换机 文章目录 RabbitMQ进阶 Queue队列参数详解 死信交换机 1 Dead Letter Exchange 介绍 2 死信消息方式 2 1 消息被拒绝 2 1 1 channe