15.federation

2023-05-16

federation和shovel

federation-exchange

问题的由来:

城市A有rabbitmqA,城市B有rabbitmqB,当城市B的应用要发消息到exchangeA的时候,会因为网络原因,导致发送时间延时。

federation-exchange的作用:

federation提供了一个能力,让城市B的mq去接收exchangeA的消息,然后再把消息转发到城市A的exchangeA

在这里插入图片描述

案例演示

  • 准备两台rabbitmq服务,保证每台节点单独运行

    • rabbit@Jan
    • rabbit@Feb
  • 在每台机器上开启federation相关插件

    rabbitmq-plugins enable rabbitmq_federation --offline
    rabbitmq-plugins enable rabbitmq_federation_managemen --offline
    
  • 开启后在管理台页面发现新增选项卡

    在这里插入图片描述

  • 运行ConsumerFeb的代码

    • 通过ConsumerFeb代码创建了fed-queuefed-exchange
    /**
     * Feb 消息消费者
     */
    public class ConsumerFeb {
    
        private static final String  QUEUE_NAME="fed-queue";
        private static final String  EXCHANGE_NAME="fed-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("172.16.140.131");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
    
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"aaa");
            
            System.out.println("等待接收消息");
    
            //推送的消息如何进行消费的接口回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
              String result = new String(message.getBody());
                System.out.println("消费者接收到消息,消息内容为:"+result);
            };
    
            //取消消费的一个回调接口
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
    
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    
  • Feb添加upstream

在这里插入图片描述

  • 添加policy

    在这里插入图片描述

  • 查看federation status

    在这里插入图片描述

  • 添加ConsumerJan消费者代码

    /**
     *  Jan 消息消费者
     */
    public class ConsumerJan {
    
        private static final String  QUEUE_NAME="federation-queue";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("172.16.140.130");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
    
            System.out.println("等待接收消息");
    
            //推送的消息如何进行消费的接口回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
              String result = new String(message.getBody());
                System.out.println("消费者接收到消息,消息内容为:"+result);
            };
    
            //取消消费的一个回调接口
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
    
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
        }
    }
    
  • 添加生产者代码

    /**
     * federation-exchange-消息生产者
     */
    public class Producer {
    
        private static final String  EXCHANGE_NAME="fed-exchange";
    
        public static void main(String[] args) throws Exception {
            //创建一个连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("172.16.140.130");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
            //获取连接
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            String message = "hello world";
            channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes(StandardCharsets.UTF_8));
    
            System.out.println("消息发送完毕");
    
        }
    }
    
  • 启动消费者Feb,消费者Jan,再启动生产者

    • 发现生产者发送消息到Jan,通过federation将消息转发到Feb,在消费者Feb中得到输出

federation-queue

在这里插入图片描述

shovel

  • 开启插件

    rabbitmq-plugins enable rabbitmq_shovel --offline
    rabbitmq-plugins enable rabbitmq_shovel_management --offline
    

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

15.federation 的相关文章

  • TX2查看设备信息命令汇总

    内存 free m 系统内核 uname a CPU信息 lscpu USB设备 lsusb CPU占用情况 sudo apt get install htop htop 查看cudnn版本 xff1a cat usr include cu
  • 蓝牙:蓝牙协议

    蓝牙协议学习整理 xff08 一 xff09 蓝牙的概述 转自 xff1a https blog csdn net guoxiaolongonly article details 78414870 传送门 xff1a xff08 一 xff
  • Thinkphp 6.0数据库的时间查询

    本节课我们要单独学习一下时间的所有查询方式 xff0c 包括传统式 快捷方式和固定查询等 一 xff0e 传统方式 1 可以使用 gt lt gt 61 lt 61 来筛选匹配时间的数据 xff1b Db name 39 user 39 g
  • Java中基本数据类型和包装类型的区别

    1 包装类是对象 xff0c 拥有方法和字段 xff0c 对象的调用都是通过引用对象的地址 xff1b 基本类型不是 2 包装类型是引用的传递 xff1b 基本类型是值的传递 3 声明方式不同 xff1a 基本数据类型不需要new关键字 x
  • git diff如何退出

    git diff 对比两次文件修改了什么 但如何退出呢 xff1f 按q即可
  • 数据结构,计算机网络,数据库,计算机组成原理,操作系统有哪些好的网课值得推荐?

    大家好 xff0c 我是小林哥 作为自学CS过来的老学长 xff0c 看过中国mooc b站 网易云课堂很多视频 xff0c 期间踩了不少坑 xff0c 这次掏心掏肺前来跟分享下 xff0c 网上的资源是免费的 xff0c 但是找到质量好的
  • MATLAB中im2bw函数-将图像转换为二值图像

    matlab中DIP工具箱函数im2bw使用阈值 xff08 threshold xff09 变换法把灰度图像 xff08 grayscale image xff09 转换成二值图像 所谓二值图像 xff0c 一般意义上是指只有纯黑 xff
  • Ubuntu18.04使用RPLIDAR A2M12雷达出错的解决办法

    最近领导要我用A2M12雷达搞SLAM xff0c 但是用电脑连上这个雷达捣鼓了两三天才能够拿到数据 就把踩的坑记录一下 软硬件平台 Nvidia Jetson Nano xff08 4GB版本的 xff09 Ubuntu 18 04 报错
  • workerman 连接失败可能的原因

    刚开始使用workerman时很常见的一个问题是客户端连接服务端失败 原因一般如下 xff1a 1 服务器防火墙 包括云服务器安全组 阻止了连接 xff08 50 几率是这个 xff09 2 客户端和服务端使用的协议不一致 xff08 30
  • 排序算法:冒泡排序和选择排序的思路,区别与优缺点。

    一 xff0c 冒泡排序 xff1a 冒泡排序的定义就不提了 xff0c 总结起来就一句话 xff08 划重点 xff09 xff1a xff0c 从左到右 xff0c 数组中相邻的两个元素进行比较 xff0c 将较大的放到后面 算法思路
  • ROS创建功能包并自定义消息

    ROS有时需要自定义消息 xff0c 本文叙述如何通过创建功能包并自定义消息 创建ROS工作空间具体实现 xff1a https blog csdn net qq 34911636 article details 100103448 创建一
  • 卡尔曼滤波详细推导

    卡尔曼滤波 xff08 Kalman filtering xff09 是一种利用线性系统状态方程 xff0c 通过系统输入输出观测数据 xff0c 对系统状态进行最优估计的算法 xff0c 由于观测数据中包括系统中的噪声和干扰的影响 xff
  • ROS tf工具与消息查看命令

    TF工具坐标系统是一个基础理论 xff0c 但是涉及到多个空间的变换 xff0c 不容易进行想象所以TF工具给开发者调试提供很多方便 1 tf monitor xff1a 将当前的坐标系转换关系打印到终端控制台 rosrun tf tf m
  • melodic 打开gazebo出现[Err] [REST.cc:205] Error in REST request错误解决方法

    ROS melodic版本下打开gazebo出现 Err REST cc 205 Error in REST request错误解决方法 输入以下命令打开文件 sudo gedit ignition fuel config yaml 然后将
  • 技术资源汇总(一)

    1 Ubuntu技术论坛 xff1a https askubuntu com 2 树莓派资源 https www yahboom com study raspberry3B 密码 xff1a cf0p 汇总资料提取码 xff1a hdy7
  • docker常用命令

    1 配置docker阿里云镜像 1 打开daemon json文件 xff08 若没有此文件 xff0c 则创建 etc docker daemon json xff09 xff1a vi etc docker daemon json 2
  • 网络调试助手UDP广播问题

    用直接广播地址 xff08 192 168 xxx 255 端口 xff09 可以进行广播 xff1b 用受限广播地址 xff08 255 255 255 255 端口 xff09 显示没有指定有效的远程主机端口 xff0c 搞了好久发现是

随机推荐

  • “平衡小车之家”家的STM32F103最小系统源代码分享

    在网上寻找了好久 xff0c 因为他家的开发板自带有mpu6050模块 故想测试其精准度以及z轴漂移程度 发现也有很大的漂移 代码如下 main c部分 xff1a span class token macro property span
  • 使用PMW3901和VL53L1X 实现室内定点悬停

    使用PMW3901和VL53L1X 实现室内定点悬停 使用PMW3901 光流传感器进行水平方向定位Pixhawk连接PMW3901传感器PX4源代码加入PMW3901驱动后重新编译QGroundControl中的配置 使用气压计和VL53
  • 使用 QGroundControl 地面站更新 PixHawk飞控的Bootloader

    安装最新版本的PX4固件 启动QGroundControl并且使用USB连接到Pixhawk飞控 选择 Q icon gt Vehicle Setup gt Firmware sidebar 打开固件设置 安装最新版本的PX4固件 更新Bo
  • 自制DIY 机器狗 完全教程 - MIT猎豹Cheetah

    自制DIY 机器狗 完全教程 MIT猎豹Cheetah 背景结构设计模块化关节电机性能考虑关节结构 四足平台设计腿部设计身体设计脚部设计 硬件设计关节驱动器通信总线板供电系统 控制系统人工智能 背景 3年前 xff0c MIT开源了世界上跑
  • centos安装wxWidgets,erlang,RabbitMq

    centos安装wxWidgets erlang RabbitMq 默认已经安装了java环境 而安装RabbitMq需要安装erlang xff0c 安装erlang又需要安装wxWidgets 安装wxWidgets 更新系统 yum
  • 2.rabbitmq概述和helloworld

    rabbitmq概述 rabbitmq中的几个概念 BROKER 接收和分发消息的应用 xff0c RabbitMQ Server 就是 Message Broker Virtual Host 出于多租户和安全因素设计的 xff0c 把 A
  • 3.rabbitmq轮询和不公平分发

    rabbitmq轮询和不公平分发 rabbitmq轮询分发 rabbitmq默认是使用轮询来分发消息的 测试代码如下所示 生产者代码 span class token comment 生产者 task rabbitmq 轮询演示 span
  • 4.rabbitmq消息应答

    rabbitmq消息应答 概述 消息应答就是消费者在收到消息的时候 xff0c 在它接收到消息并处理完毕之后 xff0c 告诉rabbitmq它已经处理完了 xff0c rabbitmq可以删除这个消息了 消息应答的方式 channel b
  • 5.rabbitmq持久化

    rabbitmq持久化 队列的持久化 队列的持久化需要我们在声明的时候指定其持久化 使用durable 61 true来持久化队列 span class token comment 队列的持久化 span span class token
  • 关于双控阵列的实现原理的讨论

    xfeff xfeff http bbs chinaunix net forum viewthread tid 4140392 html 对于一个支持FC SAN的双控存储阵列 xff0c 对外号称active active xff0c 实
  • 6.rabbitmq中exchange的几种形式

    rabbitmq中exchange的几种形式 RabbitMQ 消息传递模型的核心思想是 生产者生产的消息从不会直接发送到队列 实际上 xff0c 通常生产 者甚至都不知道这些消息传递传递到了哪些队列中 相反 xff0c 生产者只能将消息发
  • 7.rabbitmq死信和死信队列

    rabbitmq死信和死信队列 概述 先从概念解释上搞清楚这个定义 xff0c 死信 xff0c 顾名思义就是无法被消费的消息 xff0c 字面意思可以这样理 解 xff0c 一般来说 xff0c producer 将消息投递到 broke
  • 8.rabbitmq发布确认

    rabbitmq发布确认 生产者将信道设置成 confirm 模式 xff0c 一旦信道进入 confirm 模式 xff0c 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID 从 1 开始 xff0c 一旦消息被投递到所有匹配的队
  • 9.延迟队列

    延迟队列 延迟队列的概念 延时队列 队列内部是有序的 xff0c 最重要的特性就体现在它的延时属性上 xff0c 延时队列中的元素是希望 在指定时间到了以后或之前取出和处理 xff0c 简单来说 xff0c 延时队列就是用来存放需要在指定时
  • 10.回退消息

    rabbitmq回退消息 mandatory参数 在仅开启了生产者确认机制的情况下 xff0c 交换机接收到消息后 xff0c 会直接给消息生产者发送确认消息 xff0c 如果发现该消息不可路由 xff0c 那么消息会被直接丢弃 xff0c
  • 11.备份交换机

    备份交换机 概念 当交换机收到一条不可路由消息时 xff0c 将会把这条消息转发到备份交换机中 xff0c 由备份交换机来进行转发和处理 xff0c 通常备份交换机的类型为fanout xff0c 这样就能把所有消息都投递到与其绑定的队列中
  • 12.优先级队列和惰性队列

    优先级队列 如何添加优先级 选择Maximum priority xff0c 指定优先级的数值 xff0c 设定范围为0 255 xff0c 如果值为10 xff0c 那么就是0 10 xff0c 最大不能超过255 代码形式 span c
  • 13.rabbitmq集群搭建

    rabbitmq集群搭建和镜像队列 集群搭建 准备三台服务器 172 16 140 133 Jan172 16 140 132 Feb172 16 140 133 Mar 修改3台机器的hosts文件 span class token fu
  • 14.haproxy+keepalived负载均衡和高可用

    haproxy 43 keepalived负载均衡和高可用 概述 多个rabbitmq服务形成集群 xff0c 由haproxy来做负载均衡 xff0c haproxy会暴露出来一个端口 xff0c 客户端可以通过haproxy所在的服务器
  • 15.federation

    federation和shovel federation exchange 问题的由来 xff1a 城市A有rabbitmqA xff0c 城市B有rabbitmqB xff0c 当城市B的应用要发消息到exchangeA的时候 xff0c