RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(4)-头交换机
1.发布/订阅 模式
本篇文章紧接 发布订阅模式前篇 RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)
本片文章接着讲 发布订阅模式的另外几种交换机
2.Direct 直连交换机
在之前文章中讲过 RabbitMQ系列(五)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(1)-直连交换机
3.Fanout 扇形交换机
在之前文章讲过 RabbitMQ系列(六)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(2)-扇形交换机
4.Topic 主题交换机
在之前文章讲过 RabbitMQ系列(七)RabbitMQ进阶-Queue队列特性 (三) 发布/订阅 模式(3)-主题交换机
5.Header 头交换机
说实话,头交换机用的比较少,没想到他具体的用法,但是他和之前几个交换机最大的区别是
1.扇形是广播,忽略 RoutingKey,直连、主题都需要RoutingKey,但是头交换机不需要RoutingKey
2.头交换机的匹配规则不被限定为字符串,它可以是对象Object
3.头交换机是Key:value模式的,通过x-match的参数 any/all 来区分是否路由
- any:任何一个键值对匹配,就路由
- all: 必须所有的键值对都匹配,才路由
形象的用下面的图说明
x-match:any , message的 key1:value1匹配到了 队列Q1,就路由到1队列
x-match:all,并不是所有都匹配,消息不会路由到队列Q2
5.1 代码实战
这次我们还不创建消费者,直接通过队列的消息数来看,消息经过Header 交换机后,到底如何分发
先创建包,新建包topic,包下面新建交换机枚举ExchangeTypeEnum
package header;
public enum ExchangeTypeEnum {
DIRECT("exchange-direct-name", "direct"),
FANOUT("exchange-fanout-name", "fanout"),
TOPIC("exchange-topic-name", "topic"),
HEADER("exchange-header-name", "header"),
UNKNOWN("unknown-exchange-name", "direct");
/**
* 交换机名字
*/
private String name;
/**
* 交换机类型
*/
private String type;
ExchangeTypeEnum(String name, String type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public static ExchangeTypeEnum getEnum(String type) {
ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();
for (ExchangeTypeEnum exchange : exchangeArrays) {
if (exchange.getName().equals(type)) {
return exchange;
}
}
return ExchangeTypeEnum.UNKNOWN;
}
}
创建主题交换机常量类,定义一些 RK及队列
package header;
public class HeaderConst {
/**
* 消息订阅队列 G
*/
public final static String SUBSCRIBE_QUEUE_NAME_HEADER_G = "subscribe_queue_header_G";
/**
* 消息订阅队列 H
*/
public final static String SUBSCRIBE_QUEUE_NAME_HEADER_H = "subscribe_queue_header_H";
/**
* 消息订阅队列 I
*/
public final static String SUBSCRIBE_QUEUE_NAME_HEADER_I = "subscribe_queue_header_I";
/**
* 路由RoutingKey G
*/
public final static String ROUTINGKEY_G = "rk.company";
/**
* 路由RoutingKey H
*/
public final static String ROUTINGKEY_H = "rk.company.*";
/**
* 路由RoutingKey I
*/
public final static String ROUTINGKEY_I = "*.*.employee";
}
5.1.1 生产者
我们通过设置不同的头Header的Map及 队列中的x-match规则,发送消息,看队列中是否接收到了消息,来验证我们的猜想,我们设置了任意的RoutingKey,因为头交换机的规则不受RK的影响
package header;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import static header.HeaderConst.*;
public class HeaderProducer {
/**
* 生产 Direct直连 交换机的MQ消息
*/
public static void produceTopicExchangeMessage(Map<String, Object> sendHeadMaps) throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
/*声明 直连交换机 交换机 String exchange,
* 参数明细
* 1、交换机名称
* 2、交换机类型,fanout
*/
channel.exchangeDeclare(ExchangeTypeEnum.HEADER.getName(), ExchangeTypeEnum.HEADER.getType());
/*声明队列
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_HEADER_G, true, false, false, null);
channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_HEADER_H, true, false, false, null);
channel.queueDeclare(SUBSCRIBE_QUEUE_NAME_HEADER_I, true, false, false, null);
/*交换机和队列绑定String queue, String exchange, String routingKey
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key rk.subscribe_queue_direct
*/
channel.queueBind(SUBSCRIBE_QUEUE_NAME_HEADER_G, ExchangeTypeEnum.HEADER.getName(), ROUTINGKEY_G, headG());
channel.queueBind(SUBSCRIBE_QUEUE_NAME_HEADER_H, ExchangeTypeEnum.HEADER.getName(), ROUTINGKEY_H, headH());
channel.queueBind(SUBSCRIBE_QUEUE_NAME_HEADER_I, ExchangeTypeEnum.HEADER.getName(), ROUTINGKEY_I, headI());
/*
* 定义消息头 Header
*/
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(sendHeadMaps).build();
//定义消息内容(发布多条消息)
String message = " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
* exchange - 交换机 DirectExchange
* queuename - 队列信息
* props - 参数信息
* message 消息体 byte[]类型
*
* !!!! 注意 我这里的 RoutingKey并没有设置 值,传空
*/
channel.basicPublish(ExchangeTypeEnum.HEADER.getName(), "", props, message.getBytes());
System.out.println(" **** Producer Sent Message: '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
/**
* 生成 g队列的Header ,x-match = all 表示全匹配 所有的key,value时候,才路由到 G
*
* @return
*/
public static Map<String, Object> headG() {
Map<String, Object> gQueueMapHeaders = new HashMap<>();
gQueueMapHeaders.put("G", "g");
gQueueMapHeaders.put("H", "h");
gQueueMapHeaders.put("I", 1);
gQueueMapHeaders.put("x-match", "all");
return gQueueMapHeaders;
}
/**
* 生成 g队列的Header ,x-match = all 表示全匹配 所有的key,value时候,才路由到 H
*
* @return
*/
public static Map<String, Object> headH() {
Map<String, Object> gQueueMapHeaders = new HashMap<>();
gQueueMapHeaders.put("G", "g");
gQueueMapHeaders.put("I", 1);
gQueueMapHeaders.put("x-match", "any");
return gQueueMapHeaders;
}
/**
* 生成 g队列的Header ,x-match = all 表示全匹配 所有的key,value时候,才路由到 I
*
* @return
*/
public static Map<String, Object> headI() {
Map<String, Object> gQueueMapHeaders = new HashMap<>();
gQueueMapHeaders.put("I", 1);
gQueueMapHeaders.put("x-match", "all");
return gQueueMapHeaders;
}
/**
* 发送者 构造G、H、I 的Map,消费者结果 :G、H、I 全都能匹配
*/
public static void produce1() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,匹配
* H队列 x-match:any 任一匹配,头由G、I组成,匹配
* I队列 x-match:all 全匹配,生产者中有I和I队列的I全部匹配,这点有点疑惑
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("G", "g");
headerMap.put("H", "h");
headerMap.put("I", 1);
produceTopicExchangeMessage(headerMap);
}
/**
* 发送者 构造G、I 的Map,消费者结果 :H、I 都能匹配
*/
public static void produce2() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个H 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,匹配
* I队列 x-match:all 全匹配,生产者中有I 和I队列的I全部匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("G", "g");
headerMap.put("I", 1);
produceTopicExchangeMessage(headerMap);
}
/**
* 发送者 构造G、H 的Map,消费者结果 :H 都能匹配
*/
public static void produce3() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个I 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,匹配
* I队列 x-match:all 全匹配,生产者中没有I 和I队列的 完全不匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("G", "g");
headerMap.put("H", "h");
produceTopicExchangeMessage(headerMap);
}
/**
* 发送者 构造I 的Map,但是这个 1的value 是String,而不是 Integer 1 ,消费者结果 : 没有匹配
*/
public static void produce4() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,但是I的value 不匹配
* I队列 x-match:all 全匹配,生产者中有I 和I队列的I的value也 不匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("I", "1");
produceTopicExchangeMessage(headerMap);
}
/**
* 发送者 构造G、I 的Map,消费者结果 :队列H、I 都能匹配
*/
public static void produce5() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,没有任一匹配H,所以不匹配
* I队列 x-match:all 全匹配,生产者中有H 和I队列的I 都不匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("H", "h");
produceTopicExchangeMessage(headerMap);
}
public static void main(String[] argv) throws Exception {
// produce1();
// produce2();
// produce3();
// produce4();
// produce5();
}
}
5.1.2 produce1 运行
produce1 以 运行
先看下队列 3个队列及头交换机,队列分别绑定到 exchange header的主题交换机上
而且现在3个队列中是没有消息的,积攒的消息全都是 0
5.1.2 produce1 结果
以{ G:g、H:h、I:1 } 作为发送者Header的Map 发消息,结果G、H、I 全都匹配,收到了1条消息
分析:
- G队列 x-match:all 全匹配 ,头由 G、H、I组成,匹配
- H队列 x-match:any 任一匹配,头由G、I组成,匹配
- I队列 x-match:all 全匹配,头由I组成,生产者中有I和I队列的I全部 匹配,这点有疑惑
/**
* 发送者 构造G、H、I 的Map,消费者结果 :G、H、I 全都能匹配
*/
public static void produce1() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,匹配
* H队列 x-match:any 任一匹配,头由G、I组成,匹配
* I队列 x-match:all 全匹配,生产者中有I和I队列的I全部匹配,这点有点疑惑
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("G", "g");
headerMap.put("H", "h");
headerMap.put("I", 1);
produceTopicExchangeMessage(headerMap);
}
队列结果 G、H、I全都1条消息
Purge Messages 清空有消息的队列,继续下一步
5.1.3 produce2 运行-结果
以{ G:g、I:1 } 作为发送者Header的Map 发消息,结果H、I 都匹配,各收到了1条消息
分析:
- G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个H 不匹配
- H队列 x-match:any 任一匹配,头由G、I组成,任一G、I 匹配
- I队列 x-match:all 全匹配,头由I组成,生产者中I和I队列的I全部 匹配
/**
* 发送者 构造G、I 的Map,消费者结果 :H、I 都能匹配
*/
public static void produce2() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个H 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,匹配
* I队列 x-match:all 全匹配,生产者中有I 和I队列的I全部匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("G", "g");
headerMap.put("I", 1);
produceTopicExchangeMessage(headerMap);
}
查看队列结果,只有H、I匹配
Purge Messages 清空有消息的队列,继续下一步
5.1.4 produce3 运行-结果
以{ G:g、H:h } 作为发送者Header的Map 发消息,结果H 匹配,收到了1条消息
分析:
- G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个I 不匹配
- H队列 x-match:any 任一匹配,头由G、I组成,任一 G、I 匹配
- I队列 x-match:all 全匹配,头由I组成,生产者中没有I 和I队列的 完全不匹配
/**
* 发送者 构造G、H 的Map,消费者结果 :H 都能匹配
*/
public static void produce3() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少一个I 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,匹配
* I队列 x-match:all 全匹配,生产者中没有I 和I队列的 完全不匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("G", "g");
headerMap.put("H", "h");
produceTopicExchangeMessage(headerMap);
}
查看队列结果,只有H收到消息
Purge Messages 清空有消息的队列,继续下一步
5.1.5 produce4 运行-结果
以{ I:“1” } 作为发送者,构造I 的Map,但是这个 1的value 是String,而不是 Integer 1 的 Header的Map 发消息,结果没有匹配
分析:
- G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
- H队列 x-match:any 任一匹配,头由G、I组成,但是I的value 不是同一Map对象,不匹配
- I队列 x-match:all 全匹配,头由I组成,生产者中有I 和I队列的I的value也 不匹配
/**
* 发送者 构造I 的Map,但是这个 1的value 是String,而不是 Integer 1 ,消费者结果 : 没有匹配
*/
public static void produce4() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、H 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,但是I的value 不匹配
* I队列 x-match:all 全匹配,生产者中有I 和I队列的I的value也 不匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("I", "1");
produceTopicExchangeMessage(headerMap);
}
查看队列结果,没问题,没有队列能匹配,收到消息
继续下一步
5.1.6 produce5 运行-结果
以{ H:h } 作为发送者Header的Map 发消息,结果没有匹配
分析:
- G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、I 不匹配
- H队列 x-match:any 任一匹配,头由G、I组成,没有任一匹配H,所以不匹配
- I队列 x-match:all 全匹配,头由I组成,生产者中有H 和I队列的I 不匹配
/**
* 发送者 构造G、I 的Map,消费者结果 :没有匹配
*/
public static void produce5() throws Exception {
/*
* G队列 x-match:all 全匹配 ,头由 G、H、I组成,少G、I 不匹配
* H队列 x-match:any 任一匹配,头由G、I组成,没有任一匹配H,所以不匹配
* I队列 x-match:all 全匹配,生产者中有H 和I队列的I 不匹配
*/
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("H", "h");
produceTopicExchangeMessage(headerMap);
}
查看队列结果,没问题,没有队列收到消息
下篇我们介绍 RabbitMQ进阶-Queue队列参数详解