RabbitMQ交换机(扇出模式、直接模式)学习笔记

2023-11-15

视频地址

什么是交换机?

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
者甚至都不知道这些消息传递传递到了哪些队列中。

情况实际上是这样的,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

交换机的类型

  • 直接(direct),
  • 主题(topic)
  • 标题(headers)
  • 扇出(fanout)

临时队列

我们前面没有指定交换机的类型的时候都是使用的默认交换机,后面我们使用自定义的交换机的,我们需要将交换机和队列进行绑定(我们之前的ack_queue,hello),实际上我们还可以使用自动生成的队列,自动生成的队列在连接断开的时候会自动删除。

/*生成一个临时的队列
        队列的名称是随机的
        消费者断开与队列断开连接的时候就会自动删除队列
         */
        String queueName = channel.queueDeclare().getQueue();

        /*
        将队列和交换机进行绑定
        1:队列名称
        2:交换机名称
        3:绑定关系的名称
         */
        channel.queueBind(queueName,exchangeName,"");

这样我们通过一个交换机,多个队列就可以实现将一个任务被多个消费者进行消费,只要这些消费者消费的是不同队列中的消息即可。

扇出类型

代码如下

生产者:

package com.dongmu.PublishAndSubscribe;

import com.dongmu.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class Producer {
    private static final String exchangeName = "logs";
    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtil.getChannel();

        //设置交换机的工作方式
        channel.exchangeDeclare(exchangeName,"fanout");
        Scanner scanner = new Scanner(System.in);

        System.out.println("请输入需要发送的消息:");

        while (scanner.hasNext()){
            String next = scanner.next();

            /*
            1:发送到哪一个交换机
            2:路由的key
            3:其他的参数信息
            4:信息的消息体
             */
            channel.basicPublish(exchangeName,"",null,next.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息'"+next+"'完成。");
        }



    }
}

消费者1

package com.dongmu.PublishAndSubscribe;

import com.dongmu.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;


/**
 * RabbitMQ的发布订阅模式
 */
public class Consumer1 {

    private static final String exchangeName = "logs";

    public static void main(String[] args) throws Exception {

        //首先获取信道
        Channel channel = RabbitMQUtil.getChannel();

        //声明一个交换机,
        /*
         * 1:交换机的名称
         * 2:交换机的类型(fanout表示的扇出类型,就是发布订阅模式)
         */
        channel.exchangeDeclare(exchangeName,"fanout");


        /*生成一个临时的队列
        队列的名称是随机的
        消费者断开与队列断开连接的时候就会自动删除队列
         */
        String queueName = channel.queueDeclare().getQueue();

        /*
        将队列和交换机进行绑定
        1:队列名称
        2:交换机名称
        3:绑定关系的名称
         */
        channel.queueBind(queueName,exchangeName,"");

        DeliverCallback deliverCallback = (name, delivery)->{
            System.out.println("消费者接收到了消息:"+name+",消息的内容是:"+new String(delivery.getBody(), StandardCharsets.UTF_8));
        };

        channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});

    }

}

消费者2和消费者1代码相同

结果:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

直接模式

上面的扇出模式中,我们使用的交换机和队列进行绑定的时候有一个绑定的routingkey,上面我们写的是一个空串。实际上我们可以进行绑定不同的routingkey,同一个队列可以绑定不同的routingkey。

我们只需要把消息发送到某一个交换机指定它的路由即可,对应的包含这个路由的队列会收到这个消息。

代码

package com.dongmu.direct;

import com.dongmu.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class Producer {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String message = "北海冬木";
        Scanner scanner = new Scanner(System.in);
        channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes(StandardCharsets.UTF_8));
        while (scanner.hasNext()){
            message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息"+message);
        }
    }

}

package com.dongmu.direct;

import com.dongmu.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Consumer1 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queuename = "consule";
        String routingkey1 = "info";
        String routingkey2 = "warning";

        channel.queueDeclare(queuename,false,false,false,null);

        channel.queueBind(queuename,EXCHANGE_NAME,routingkey1);
        channel.queueBind(queuename,EXCHANGE_NAME,routingkey2);

        DeliverCallback callback_ack = (queueName,message)->{
            System.out.println("消费了队列"+queueName+"中的消息,消息的内容是:"+new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback callback_nack = (queueName)->{
            System.out.println("消费队列"+queueName+"中的消息失败。");
        };

        channel.basicConsume(queuename,true,callback_ack,callback_nack);


    }
}

package com.dongmu.direct;

import com.dongmu.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class Consumer2 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queuename = "disk";
        String routingkey = "error";

        channel.queueDeclare(queuename,false,false,false,null);

        channel.queueBind(queuename,EXCHANGE_NAME,routingkey);

        DeliverCallback callback_ack = (queueName,message)->{
            System.out.println("消费了队列"+queueName+"中的消息,消息的内容是:"+new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback callback_nack = (queueName)->{
            System.out.println("消费队列"+queueName+"中的消息失败。");
        };

        channel.basicConsume(queuename,true,callback_ack,callback_nack);


    }
}

当我们先把两个消费者启动,然后启动生产者,由于生产者会首先发送一个“冬木”到error的routingKey,所以只有consumer2里面的队列能够收到。这个时候只有consumer2的控制台能够看到一下内容
在这里插入图片描述
如果我们在producer中发送下面的内容
在这里插入图片描述

由于producer把它发送到了绑定routingkey是info的队列的当中,因此这个时候只有consumer1能够消费到这个消息,并且只能消费到一条消息。
在这里插入图片描述
如果我们在生产者的代码写成下面这个样子

channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));

这个时候由于consumer1里面的队列绑定了两个routingkey,也就是会路由到这个队列中两次消息,所以就会消费两次。

另外需要注意的是我们到现在为止,编写的代码都是在消费者中将队列和交换机进行绑定的,其实这样子不太好,我们必须先启动消费者然后再启动生产者。如果我们先启动的是生产者然后再启动消费者,消费者就会无法消费到启动之前的消息,因为启动时候的生产者没有将交换机和队列进行绑定,然后交换机又是不能存储消息的,这就导致了消息的丢失,所以如果在实战中是使用的时候我们还是尽量在生产者中将交换机和队列进行绑定。
另外可能你会发现你自己做实验的结果不是这样的,这是由于你可能没有在实验之前将已经存在的交换机和队列进行删除,导致生产者启动的时候,交换机和队列就已经绑定在了一起。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ交换机(扇出模式、直接模式)学习笔记 的相关文章

  • Spring控制器是线程安全的吗

    我遇到了这个控制器示例 想知道它是否是线程安全的 我特别想知道 gson 实例变量 import org springframework stereotype Controller import org springframework we
  • Android - 如何访问 onResume 中 onCreate 中实例化的 View 对象?

    In my onCreate 方法 我正在实例化一个ImageButton View public void onCreate Bundle savedInstanceState super onCreate savedInstanceSt
  • 如何在 Eclipse 中用阿拉伯语读写

    我在 eclipse 中编写了这段代码来获取一些阿拉伯语单词 然后打印它们 public class getString public static void main String args throws Exception PrintS
  • 检查双精度值的等于和不等于条件

    我在比较两者时遇到困难double values using and 我创建了 6 个双变量并尝试进行比较If健康 状况 double a b c d e f if a b c d e f My code here in case of t
  • Java 创建浮雕(红/蓝图像)

    我正在编写一个 Java 游戏引擎 http victoryengine org http victoryengine org 并且我一直在尝试生成具有深度的 3D 图像 您可以使用那些红色 蓝色眼镜看到 我正在使用 Java2D 进行图形
  • 由于连接超时,无法通过 ImageIO.read(url) 获取图像

    下面的代码似乎总是失败 URL url new URL http userserve ak last fm serve 126 8636005 jpg Image img ImageIO read url System out printl
  • 无法加载 jar 文件的主类

    我使用 Eclipse IDE 开发了一个应用程序 创建应用程序后 我以 jar 格式导出项目 当我尝试运行此 jar 文件时 出现错误 无法加载主类 请帮忙 当您将项目导出为 jar 时 请参阅此所以问题 https stackoverf
  • 有没有好的方法来解析用户代理字符串?

    我有一个Java接收模块User Agent来自最终用户浏览器的字符串的行为需要略有不同 具体取决于浏览器类型 浏览器版本甚至操作系统 例如 FireFox 7 0 Win7 Safari 3 2 iOS9 我明白了User Agent由于
  • Google Inbox 类似 RecyclerView 项目打开动画

    目前 我正在尝试实现 Google Inbox 例如RecyclerView行为 我对电子邮件打开动画很好奇 我的问题是 该怎么做 我的意思是 他们使用了哪种方法 他们用过吗ItemAnimator dispatchChangeStarti
  • spring - 强制 @Autowired 字段的 cglib 代理

    我有混合堆栈 EJB 和 Spring 为了将 Spring 自动装配到 EJB 我使用SpringBeanAutowiringInterceptor 不确定这是否会影响我遇到的问题 在尝试通过以下方式自动装配 bean 时 Scope p
  • 场景生成器删除 fxml 文件中的导入

    我使用场景构建器 Gluon Scene Builder JavaFX Scene Builder 8 1 1 来创建应用程序的 UI 并使用 Eclipse 开发 JavaFX 现在 每次我在场景生成器中保存某些内容时 它都会从 fxml
  • 使用 java 按电子邮件发送日历邀请

    我正在尝试使用 java 发送每封电子邮件的日历邀请 收件人收到电子邮件 但不会显示接受或拒绝的邀请 而是将该事件自动添加到他的日历中 我正在使用 ical4j jar 构建活动 邀请 private Calendar getInvite
  • @EnableTransactionManagement 的范围是什么?

    我试图了解正确的放置位置 EnableTransactionManagement多个 JavaConfig 上下文的情况下的注释 考虑以下场景 我在 JPAConfig java 和 AppConfig java 中有 JPA 配置以及一组
  • 参数动态时如何构建 JPQL 查询?

    我想知道是否有一个好的解决方案来构建基于过滤器的 JPQL 查询 我的查询太 富有表现力 我无法使用 Criteria 就像是 query Select from Ent if parameter null query WHERE fiel
  • 为什么 ConcurrentHashMap::putIfAbsent 比 ConcurrentHashMap::computeIfAbsent 更快?

    使用 ConcurrentHashMap 我发现computeIfAbsent 比putIfAbsent 慢两倍 这是简单的测试 import java util ArrayList import java util List import
  • 从 html 页面和 javascript 调用 java webservice

    我正在尝试从 javascript 调用 java 实现的 Web 服务 使用 NetBeans IDE 我读过很多关于 jQuery 和 AJAX 的内容 但我似乎无法掌握它 假设我的 Web 服务 WSDL 位于 http localh
  • 如何使用 Mockito 和 Junit 模拟 ZonedDateTime

    我需要模拟一个ZonedDateTime ofInstant 方法 我知道SO中有很多建议 但对于我的具体问题 到目前为止我还没有找到任何简单的解决办法 这是我的代码 public ZonedDateTime myMethodToTest
  • 如何在android sdk上使用PowerMock

    我想为我的 android 项目编写一些单元测试和仪器测试 然而 我遇到了一个困扰我一段时间的问题 我需要模拟静态方法并伪造返回值来测试项目 经过一些论坛的调查 唯一的方法是使用PowerMock来模拟静态方法 这是我的 gradle 的一
  • struts 教程或示例

    我正在尝试在 Struts 中制作一个登录页面 这个想法是验证用户是否存在等 然后如果有错误 则返回到登录页面 错误显示为红色 典型的登录或任何表单页面验证 我想知道是否有人知道 Struts 中的错误管理教程 我正在专门寻找有关的教程 或
  • Path2D 上的鼠标指针检测

    我构建了一个Path2D http docs oracle com javase 7 docs api java awt geom Path2D html表示由直线组成的未闭合形状 我希望能够检测何时单击鼠标并且鼠标指针靠近路径 在几个像素

随机推荐

  • Matlab实现自适应动态规划多层神经网络的算例汇总

    使用MATLAB实现自适应动态规划 ADP 多层神经网络的算例 包括扭摆系统 仿射非线性算例以及 质量 弹簧 阻尼 系统 扭摆系统 torsional pendulum system 文献出处 1 Liu D Wei Q Policy It
  • 大数据模型部署思路

    提出问题 以神经网络为例 MATLAB可以用训练集来训练数据 随后用测试集来检测模型准确度 最后用该模型来决策新数据 在大数据平台下如何训练数据呢 又如何用训练好的模型对新数据进行决策呢 方法1 spark自带机器学习库mlib 用原生机器
  • bootstrap-table按住Shift多选设计思路-优化

    在网上找到bootstra table按住shift实现多选的思路后 进行改进 实现批量选中 批量取消选中 批量选中后取消部分选中并保留剩下选中项 向上 向下批量操作 参考地址 https blog csdn net qq 36360308
  • 计算机二级Java(第一份)

    1 1 第三范式是在第二范式的基础上消除了 非主属性对键的传递函数依赖 第三范式是为了消除数据库中关键字之间的依赖关系 2 冯诺依曼 输入 存储 运算 控制 输出 3 一般情况下 划分计算机四个发展阶段的主要依据是 计算机所采用的基本元器件
  • PPTP协议简述

    http jiaoyu 3158 cn 20121128 n4230270592726 html PPTP Point to Point Tunnel Protocol 点对点隧道协议 是建立在PPP Point to Point 点对点协
  • c#泛型

    参考博文 http www cnblogs com 1175429393wljblog p 5519701 html 泛型类型参数 其是一个占位符 不是一个真正的类型 而更像是一个类型的蓝图 当我们在声明 实例化该类型的变量时 则把T改为指
  • c#中字符串编码方式的转换,附带程序uft8到gb2312的互换

    前面做一个基于sybase的mis系统 由于sybase的后台是cp850编码 而 net平台不支持cp850编码 所以在程序中所有从数据库读出的中文都显示为 于是考虑在 net 平台中转换字符编码 于是查看了 net中字符编码的类Syst
  • error C4996: ‘strcpy‘: This function or variable may be unsafe. Consider using strcpy_s instead.

    Hello everyone 我是鲁班 一个热衷于科研和软开的胖子 出现这个错误时 是因为strcpy函数不安全造成的溢出 解决方法是 找到 项目属性 点击 C 里的 预处理器 对 预处理器 进行编辑 在里面加入一段代码 CRT SECUR
  • Error:Cannot build artifact xxx:war exploded' because it is included into a circular dependency

    Error Cannot build artifact xxx war exploded because it is included into a circular dependency 解决方案 IDEA 项目报错 Error Cann
  • python遍历目录压缩文件夹_干货

    如何遍历查找出某个文件夹内所有的子文件呢 并且找出某个后缀的所有文件 walk功能简介 os walk 方法可以用于在文件目录中进行查找和遍历操作 os walk 方法是一个简单易用的文件 目录遍历器 可以帮助我们高效的处理文件 目录方面的
  • 目标检测:Generalized Focal Loss(NIPS2020)

    Generalized Focal Loss Learning Qualified and Distributed Bounding Boxes for Dense Object Detection CCF A Xiang Li Wenha
  • Linux下查看tomcat占用端口

    1 先查看tomcat的进程号 ps ef grep tomcat 后面带 号 是为了查看多个tomcat进程 例如tomcat6 tomcat7 解释命令 ps 查看当前系统进程状态 可以搭配kill指令随时中断 删除不必要的程序 A 显
  • 高校俱乐部参会成员餐票领取安排及第二届研讨会邀请信

    尊敬的高校俱乐部合作伙伴 您好 感谢您参加移动开发者大会 今天下午电子门票已全部发出 请按照门票信息到国家会议中心签到处进行门票注册 北京周五和周六的天气温和 天气预报显示不会下雨 气温区间20 10摄氏度 需要一件外套 请在入场后到附件地
  • 《我的眼睛--图灵识别》附录

    我的眼睛 图灵识别 附录 1 远程代答 远程代答系统 答的是什么呢 答的便是出来以久的验证图形码 在互联网快速发展的时代 很多应用平台层出不穷 验证图形码的种类也从原来的简单清晰的数字变成了模糊的字母 甚至大小写字母数字组合 还有的是简单的
  • 素数环圈——回溯法

    任务描述 本关任务 把从 1 到 n 这 n 个数摆成一个环 要求相邻的两个数的和是一个素数 测试输入 20 预期输出 围成的圈是 1 2 3 4 7 6 5 8 9 10 13 16 15 14 17 20 11 12 19 18 1 n
  • ubuntu 上装win xp

    如果ubuntu不想用 想换成win xp系统 但ubuntu并非卸就直接装xp了 开机后却无法正常进入xp系统 解决方法 1 先到网上下载MbrFix exe http www linuxidc com upload 2007 11 07
  • 你为什么没有男朋友

    一月 放寒假 买票回家 小A托老乡学长代买 请饭答谢 一来二去 学长遂成男友 一月 放寒假 你自己半夜去车站排队 然后 就没有然后了 二月 同学聚会 小B与当年暧昧的男同学的再续前缘 老同学遂成男友 二月 你在家帮你妈做卫生 然后 就没有然
  • Stream流的常用方法

    目录 一 遍历 1 属性遍历 2 对象遍历 3 遍历设值 二 过滤 1 简单过滤 2 多条件过滤 三 去重 四 统计 五 截取 六 跳过 七 排序 1 普通排序 2 指定排序 八 最值 1 集合确定不为空 2 集合可能为空 九 统计 十 L
  • 织梦DEDECMS EXCEL数据批量导入文章插件 支持自定义模型和字段 2021/09/13更新

    插件介绍 本插件为织梦EXCEL批量导入数据插件 支持tag导入 目前只有UTF版本 GBK版本自行转码 默认只支持普通文章模型 不包括自定义字段 需要另外模型或者添加新字段的可以联系客服收费修改 支持自定义字段和自定义模型 使用前 请先备
  • RabbitMQ交换机(扇出模式、直接模式)学习笔记

    视频地址 什么是交换机 RabbitMQ 消息传递模型的核心思想是 生产者生产的消息从不会直接发送到队列 实际上 通常生产 者甚至都不知道这些消息传递传递到了哪些队列中 情况实际上是这样的 生产者只能将消息发送到交换机 exchange 交