RabbitMQ系列(八)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(4)-头交换机

2023-10-30

RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(4)-头交换机

1.发布/订阅 模式

本篇文章紧接 发布订阅模式前篇 RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)
本片文章接着讲 发布订阅模式的另外几种交换机

2.Direct 直连交换机

在之前文章中讲过 RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机

3.Fanout 扇形交换机

在之前文章讲过 RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机

4.Topic 主题交换机

在之前文章讲过 RabbitMQ系列(七)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(3)-主题交换机

5.Header 头交换机

说实话,头交换机用的比较少,没想到他具体的用法,但是他和之前几个交换机最大的区别是
1.扇形是广播,忽略 RoutingKey,直连、主题都需要RoutingKey,但是头交换机不需要RoutingKey
2.头交换机的匹配规则不被限定为字符串,它可以是对象Object
3.头交换机是Key:value模式的,通过x-match的参数 any/all 来区分是否路由

  • any:任何一个键值对匹配,就路由
  • all: 必须所有的键值对都匹配,才路由

形象的用下面的图说明
x-match:any , message的 key1:value1匹配到了 队列Q1,就路由到1队列
x-match:all,并不是所有都匹配,消息不会路由到队列Q2
在这里插入图片描述

5.1 代码实战

这次我们还不创建消费者,直接通过队列的消息数来看,消息经过Header 交换机后,到底如何分发
先创建包,新建包topic,包下面新建交换机枚举ExchangeTypeEnum

package header;

public enum ExchangeTypeEnum {

    DIRECT("exchange-direct-name", "direct"),
    FANOUT("exchange-fanout-name", "fanout"),
    TOPIC("exchange-topic-name", "topic"),
    HEADER("exchange-header-name", "header"),
    UNKNOWN("unknown-exchange-name", "direct");

    /**
     * 交换机名字
     */
    private String name;
    /**
     * 交换机类型
     */
    private String type;

    ExchangeTypeEnum(String name, String type) {
        this.name = name;
        this.type = type;
    }

    public String getName() {
        return name;
    }

    public String getType() {
        return type;
    }

    public static ExchangeTypeEnum getEnum(String type) {
        ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();
        for (ExchangeTypeEnum exchange : exchangeArrays) {
            if (exchange.getName().equals(type)) {
                return exchange;
            }
        }
        return ExchangeTypeEnum.UNKNOWN;
    }

}

创建主题交换机常量类,定义一些 RK及队列

package header;

public class HeaderConst {
    /**
     * 消息订阅队列 G
     */
    public final static String SUBSCRIBE_QUEUE_NAME_HEADER_G = "subscribe_queue_header_G";
    /**
     * 消息订阅队列 H
     */
    public final static String SUBSCRIBE_QUEUE_NAME_HEADER_H = "subscribe_queue_header_H";

    /**
     * 消息订阅队列 I
     */
    public final static String SUBSCRIBE_QUEUE_NAME_HEADER_I = "subscribe_queue_header_I";

    /**
     * 路由RoutingKey G
     */
    public final static String ROUTINGKEY_G = "rk.company";
    /**
     * 路由RoutingKey H
     */
    public final static String ROUTINGKEY_H = "rk.company.*";
    /**
     * 路由RoutingKey I
     */
    public final static String ROUTINGKEY_I = "*.*.employee";

}


5.1.1 生产者

我们通过设置不同的头Header的Map及 队列中的x-match规则,发送消息,看队列中是否接收到了消息,来验证我们的猜想,我们设置了任意的RoutingKey,因为头交换机的规则不受RK的影响

package header;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

import static header.HeaderConst.*;


public class HeaderProducer {

    /**
     * 生产 Direct直连 交换机的MQ消息
     */
    public static void produceTopicExchangeMessage(Map<String, Object> sendHeadMaps) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = MqConnectUtil.getConnectionDefault();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        /*声明 直连交换机 交换机 String exchange,
         * 参数明细
         * 1、交换机名称
         * 2、交换机类型,fanout
         */
        channel.exchangeDeclare(ExchangeTypeEnum.HEADER.getName(), ExchangeTypeEnum.HEADER.getType());

        /*声明队列
         * 参数明细:
         * 1、队列名称
         * 2、是否持久化
         * 3、是否独占此队列
         * 4、队列不用是否自动删除
         * 5、参数
         */
        channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_HEADER_G, true, false, false, null);
        channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_HEADER_H, true, false, false, null);
        channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_HEADER_I, true, false, false, null);


        /*交换机和队列绑定String queue, String exchange, String routingKey
         * 参数明细
         * 1、队列名称
         * 2、交换机名称
         * 3、路由key rk.subscribe_queue_direct
         */


        channel.queueBind(SUBSCRIBE_QUEUE_NAME_HEADER_G, ExchangeTypeEnum.HEADER.getName(), ROUTINGKEY_G, headG());
        channel.queueBind(SUBSCRIBE_QUEUE_NAME_HEADER_H, ExchangeTypeEnum.HEADER.getName(), ROUTINGKEY_H, headH());
        channel.queueBind(SUBSCRIBE_QUEUE_NAME_HEADER_I, ExchangeTypeEnum.HEADER.getName(), ROUTINGKEY_I, headI());

        /*
         * 定义消息头 Header
         */
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(sendHeadMaps).build();

        //定义消息内容(发布多条消息)
        String message = " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
        /* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange - 交换机  DirectExchange
         * queuename - 队列信息
         * props - 参数信息
         * message 消息体 byte[]类型
         *
         * !!!! 注意 我这里的 RoutingKey并没有设置 值,传空
         */
        channel.basicPublish(ExchangeTypeEnum.HEADER.getName(), "", props, message.getBytes());
        System.out.println(" ****  Producer  Sent Message: '" + message + "'");

        //关闭通道和连接
        channel.close();
        connection.close();
    }


    /**
     * 生成 g队列的Header ,x-match = all 表示全匹配 所有的key,value时候,才路由到 G
     *
     * @return
     */
    public static Map<String, Object> headG() {
        Map<String, Object> gQueueMapHeaders = new HashMap<>();
        gQueueMapHeaders.put("G", "g");
        gQueueMapHeaders.put("H", "h");
        gQueueMapHeaders.put("I", 1);
        gQueueMapHeaders.put("x-match", "all");
        return gQueueMapHeaders;
    }


    /**
     * 生成 g队列的Header ,x-match = all 表示全匹配 所有的key,value时候,才路由到 H
     *
     * @return
     */
    public static Map<String, Object> headH() {
        Map<String, Object> gQueueMapHeaders = new HashMap<>();
        gQueueMapHeaders.put("G", "g");
        gQueueMapHeaders.put("I", 1);
        gQueueMapHeaders.put("x-match", "any");
        return gQueueMapHeaders;
    }


    /**
     * 生成 g队列的Header ,x-match = all 表示全匹配 所有的key,value时候,才路由到 I
     *
     * @return
     */
    public static Map<String, Object> headI() {
        Map<String, Object> gQueueMapHeaders = new HashMap<>();
        gQueueMapHeaders.put("I", 1);
        gQueueMapHeaders.put("x-match", "all");
        return gQueueMapHeaders;
    }

    /**
     * 发送者 构造G、H、I 的Map,消费者结果 :G、H、I 全都能匹配
     */
    public static void produce1() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,匹配
         * I队列 x-match:all 全匹配,生产者中有I和I队列的I全部匹配,这点有点疑惑
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("G", "g");
        headerMap.put("H", "h");
        headerMap.put("I", 1);

        produceTopicExchangeMessage(headerMap);
    }


    /**
     * 发送者 构造G、I 的Map,消费者结果 :H、I 都能匹配
     */
    public static void produce2() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个H 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,匹配
         * I队列 x-match:all 全匹配,生产者中有I 和I队列的I全部匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("G", "g");
        headerMap.put("I", 1);

        produceTopicExchangeMessage(headerMap);
    }

    /**
     * 发送者 构造G、H 的Map,消费者结果 :H 都能匹配
     */
    public static void produce3() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个I 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,匹配
         * I队列 x-match:all 全匹配,生产者中没有I 和I队列的 完全不匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("G", "g");
        headerMap.put("H", "h");

        produceTopicExchangeMessage(headerMap);
    }

    /**
     * 发送者 构造I 的Map,但是这个 1的value 是String,而不是 Integer 1 ,消费者结果 : 没有匹配
     */
    public static void produce4() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,但是I的value 不匹配
         * I队列 x-match:all 全匹配,生产者中有I 和I队列的I的value也 不匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("I", "1");

        produceTopicExchangeMessage(headerMap);
    }

    /**
     * 发送者 构造G、I 的Map,消费者结果 :队列H、I 都能匹配
     */
    public static void produce5() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,没有任一匹配H,所以不匹配
         * I队列 x-match:all 全匹配,生产者中有H 和I队列的I 都不匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("H", "h");

        produceTopicExchangeMessage(headerMap);
    }

    public static void main(String[] argv) throws Exception {

//        produce1();
//        produce2();
//        produce3();
//        produce4();
//        produce5();
    }
}


5.1.2 produce1 运行

produce1 以 运行
先看下队列 3个队列及头交换机,队列分别绑定到 exchange header的主题交换机上
在这里插入图片描述

而且现在3个队列中是没有消息的,积攒的消息全都是 0
在这里插入图片描述

5.1.2 produce1 结果

以{ G:g、H:h、I:1 } 作为发送者Header的Map 发消息,结果G、H、I 全都匹配,收到了1条消息
分析:

  • G队列 x-match:all 全匹配 ,头由 G、H、I组成,匹配
  • H队列 x-match:any 任一匹配,头由G、I组成,匹配
  • I队列 x-match:all 全匹配,头由I组成,生产者中有I和I队列的I全部 匹配,这点有疑惑
    /**
     * 发送者 构造G、H、I 的Map,消费者结果 :G、H、I 全都能匹配
     */
    public static void produce1() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,匹配
         * I队列 x-match:all 全匹配,生产者中有I和I队列的I全部匹配,这点有点疑惑
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("G", "g");
        headerMap.put("H", "h");
        headerMap.put("I", 1);

        produceTopicExchangeMessage(headerMap);
    }

队列结果 G、H、I全都1条消息
在这里插入图片描述
Purge Messages 清空有消息的队列,继续下一步


5.1.3 produce2 运行-结果

以{ G:g、I:1 } 作为发送者Header的Map 发消息,结果H、I 都匹配,各收到了1条消息
分析:

  • G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个H 不匹配
  • H队列 x-match:any 任一匹配,头由G、I组成,任一G、I 匹配
  • I队列 x-match:all 全匹配,头由I组成,生产者中I和I队列的I全部 匹配
    /**
     * 发送者 构造G、I 的Map,消费者结果 :H、I 都能匹配
     */
    public static void produce2() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个H 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,匹配
         * I队列 x-match:all 全匹配,生产者中有I 和I队列的I全部匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("G", "g");
        headerMap.put("I", 1);

        produceTopicExchangeMessage(headerMap);
    }

查看队列结果,只有H、I匹配
在这里插入图片描述
Purge Messages 清空有消息的队列,继续下一步


5.1.4 produce3 运行-结果

以{ G:g、H:h } 作为发送者Header的Map 发消息,结果H 匹配,收到了1条消息
分析:

  • G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个I 不匹配
  • H队列 x-match:any 任一匹配,头由G、I组成,任一 G、I 匹配
  • I队列 x-match:all 全匹配,头由I组成,生产者中没有I 和I队列的 完全不匹配
    /**
     * 发送者 构造G、H 的Map,消费者结果 :H 都能匹配
     */
    public static void produce3() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个I 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,匹配
         * I队列 x-match:all 全匹配,生产者中没有I 和I队列的 完全不匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("G", "g");
        headerMap.put("H", "h");

        produceTopicExchangeMessage(headerMap);
    }

查看队列结果,只有H收到消息
在这里插入图片描述
Purge Messages 清空有消息的队列,继续下一步


5.1.5 produce4 运行-结果

以{ I:“1” } 作为发送者,构造I 的Map,但是这个 1的value 是String,而不是 Integer 1 的 Header的Map 发消息,结果没有匹配
分析:

  • G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
  • H队列 x-match:any 任一匹配,头由G、I组成,但是I的value 不是同一Map对象,不匹配
  • I队列 x-match:all 全匹配,头由I组成,生产者中有I 和I队列的I的value也 不匹配
   /**
     * 发送者 构造I 的Map,但是这个 1的value 是String,而不是 Integer 1 ,消费者结果 : 没有匹配
     */
    public static void produce4() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,但是I的value 不匹配
         * I队列 x-match:all 全匹配,生产者中有I 和I队列的I的value也 不匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("I", "1");

        produceTopicExchangeMessage(headerMap);
    }

查看队列结果,没问题,没有队列能匹配,收到消息
在这里插入图片描述

继续下一步


5.1.6 produce5 运行-结果

以{ H:h } 作为发送者Header的Map 发消息,结果没有匹配
分析:

  • G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、I 不匹配
  • H队列 x-match:any 任一匹配,头由G、I组成,没有任一匹配H,所以不匹配
  • I队列 x-match:all 全匹配,头由I组成,生产者中有H 和I队列的I 不匹配
  /**
     * 发送者 构造G、I 的Map,消费者结果 :没有匹配
     */
    public static void produce5() throws Exception {
        /*
         * G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、I 不匹配
         * H队列 x-match:any 任一匹配,头由G、I组成,没有任一匹配H,所以不匹配
         * I队列 x-match:all 全匹配,生产者中有H 和I队列的I 不匹配
         */
        Map<String, Object> headerMap = new HashMap<>();
        headerMap.put("H", "h");

        produceTopicExchangeMessage(headerMap);
    }

查看队列结果,没问题,没有队列收到消息
在这里插入图片描述


下篇我们介绍 RabbitMQ进阶-Queue队列参数详解

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ系列(八)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(4)-头交换机 的相关文章

随机推荐