federation和shovel
federation-exchange
问题的由来:
城市A有rabbitmqA,城市B有rabbitmqB,当城市B的应用要发消息到exchangeA的时候,会因为网络原因,导致发送时间延时。
federation-exchange的作用:
federation提供了一个能力,让城市B的mq去接收exchangeA的消息,然后再把消息转发到城市A的exchangeA
案例演示
-
准备两台rabbitmq服务,保证每台节点单独运行
-
在每台机器上开启federation相关插件
rabbitmq-plugins enable rabbitmq_federation --offline
rabbitmq-plugins enable rabbitmq_federation_managemen --offline
-
开启后在管理台页面发现新增选项卡
-
运行ConsumerFeb
的代码
- 通过
ConsumerFeb
代码创建了fed-queue
和fed-exchange
public class ConsumerFeb {
private static final String QUEUE_NAME="fed-queue";
private static final String EXCHANGE_NAME="fed-exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("172.16.140.131");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"aaa");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String result = new String(message.getBody());
System.out.println("消费者接收到消息,消息内容为:"+result);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
-
Feb添加upstream
-
添加policy
-
查看federation status
-
添加ConsumerJan消费者代码
public class ConsumerJan {
private static final String QUEUE_NAME="federation-queue";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("172.16.140.130");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String result = new String(message.getBody());
System.out.println("消费者接收到消息,消息内容为:"+result);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
-
添加生产者代码
public class Producer {
private static final String EXCHANGE_NAME="fed-exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("172.16.140.130");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕");
}
}
-
启动消费者Feb,消费者Jan,再启动生产者
- 发现生产者发送消息到Jan,通过federation将消息转发到Feb,在消费者Feb中得到输出
federation-queue
shovel
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)