RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)

2023-11-12

RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)

发布者/订阅者 模型如下:
在这里插入图片描述

他与前面两个小案例最大的区别就是,他的消息不是阅完即焚的。他允许将同一条消息发送给多个消费者。而实现此操作的原因是加入了我们的交换机(exchange)。

在发布者和订阅者的模型中,各个组件的功能如下

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Queue:消息队列也与以前一样,接收消息、缓存消息。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

注意:交换机他只负责消息的转发,并不存储消息,如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!!

OK,这么解释肯定是不够的,下面我们就来说一下第一种交换机类型——Fanout(广播)在Java中的具体使用方式

写法一、配置类配置方式

步骤一、在消费者服务中,利用代码声明队列、交换机,并将两者进行绑定。
SpringAMQP提供的**交换机(Exchange)、队列(Queue)、绑定(Binding)**的API如下:
在这里插入图片描述
要将我们的队列绑定到交换机,我们需要编写我们的配置类如下:

package com.demo.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     *  声明FanoutExchange(广播交换机)
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        //交换机的名称
        return new FanoutExchange("exchange.fanout");
    }

    /**
     *  声明第一个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     *  声明第二个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     *  绑定 队列1 到 交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     *  绑定 队列2 到 交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

写完配置类,我们重启一下我们的消费者服务类,然后我们到RabbitMQ上看一下我们的交换机和队列。

可以看到,确实多了一个交换机叫 exchange.fanout。
在这里插入图片描述

我们再看一下队列,可以看到,我们两个队列也都注册成功了。
在这里插入图片描述
点击我们刚才新增的交换机,打开它的Bindings,可以看到这个交换机他告诉我们,他的消息是会转发到 fanout.queue1 和 fanout.queue2中:
在这里插入图片描述

ok,我们接着往下写:

**步骤二、在消费者服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2。 **.

监听的方法,现在应该已经写得滚瓜烂熟了吧,这里就直接贴代码了。
1、编写的类记得加 @Component 将这个监听的类注册到 Spring容器中。
2、监听哪个queue,那么就写对应的方法,并在方法上方添加@RabbitListener注解,用queues属性标明要监听的queue即可。(如果有多个,那么用 @RabbitListener(queues = {“queueName1”, “queueName2”})表示即可。

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("监听到 fanout.queue1 的消息为:【"+ msg +"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("监听到 fanout.queue2 的消息为:【"+ msg +"】");
    }
}

步骤三、在发布者服务中,编写测试方法,向交换机 exchange.fanout 发送消息。

    @Test
    public void testFanoutExchange(){
        //交换机名称
        String exchangeName = "exchange.fanout";
        //消息
        String msg = "Hello,av8d!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", msg);
    }

这里的rabbitTemplate.convertAndSend接受三个参数,分别是

public void convertAndSend(String exchange,
                           String routingKey,
                           Object object)
  1. exchange:交换机的名称
  2. routeKey:routeKey值(还不需要用到,先不管他,给个"")
  3. object:发送的消息

写完测试方法,我们跑一下我们的测试方法,然后看一下我们消费者的控制台如下:
在这里插入图片描述
可以看到,只发布了一条消息,但是通过交换机发布给两个Queue后,我们消费者的两个方法都监听到了我们同一条消息。

写法二、注解方式(@RabbitListener)

如果以前尝试了上面的写法,记得把配置类的 @Configuration 注释掉

//@Configuration
public class FanoutConfig {
...
}

然后把刚才写的两个方法注释掉。

/**
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("监听到 fanout.queue1 的消息为:【"+ msg +"】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.out.println("监听到 fanout.queue2 的消息为:【"+ msg +"】");
    }
**/

接下来,我们就开始写我们使用注解声明队列的方法。

步骤一、配置我们的RabbitMQ。(只要使用RabbitMQ,都必须要配置)

spring:
  rabbitmq:
    host: 192.168.83.134
    port: 5672
    virtual-host: /
    username: admin
    password: root
    listener:
      simple:
        prefetch: 1

步骤二、直接写我们的监听方法。(使用@RabbitListener注解写我们的路由方式、路由名称以及我们的队列名即可)

@Component
public class SpringRabbitListener {
	@RabbitListener(bindings = @QueueBinding(
	        value = @Queue(name = "fanout.queue1"),
	        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
	))
	public void listenFanoutQueue1(String msg){
	    System.out.println("监听到 fanout.queue1 的消息为:【" + msg+"】");
	}
	
	@RabbitListener(bindings = @QueueBinding(
	        value = @Queue(name = "fanout.queue2"),
	        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
	))
	public void listenFanoutQueue2(String msg){
	    System.out.println("监听到 fanout.queue2 的消息为:【" + msg+"】");
	}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange) 的相关文章

  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • 使用 Celery 时出现错误消息“无法找到记录器“多处理”的处理程序”

    RabbitMQ http en wikipedia org wiki RabbitMQ现在似乎工作正常 然而 当我尝试 python m celery bin celeryd loglevel INFO 常规的celeryd不起作用 我收
  • java中可以有switch(java.lang.Object)吗?

    我的应用程序需要有类型的 switch case 语句String 我需要这样的东西 Object list1 list1 Object list2 list2 Object list3 list3 Object option list1
  • 为什么java中的BigInteger被设计成不可变的?

    在 java 中 BigInteger 是不可变的 但我想了解为什么 因为很多时候它用于进行大量计算 从而产生大量对象 所以 不让它变得不可变感觉有点直观 我想到的情况类似于字符串操作 然后是 StringBuilder 的选项 是否应该有
  • JTextPane 的样式是否具有类似控制台的格式?

    有没有办法使 JTextPane 中的文本看起来与控制台输出的文本相似 我的意思是 基本上 每个字符如何具有相同的宽度 以便 ASCII 艺术或间距缩进之类的东西可以正常工作 例如 目前 如果我输入 First 然后输入 5 个空格 然后在
  • 什么时候数据库被称为嵌入式数据库?

    术语 嵌入式数据库 与 数据库 具有不同的含义吗 我见过的嵌入式数据库有两种定义 嵌入式数据库就像专门为 嵌入式 空间 移动设备等 设计的数据库系统一样 这意味着它们在紧张的环境中 内存 CPU 方面 可以合理地执行 嵌入式数据库就像不需要
  • 从不同 JVM 中的 Java 桌面应用程序中执行 Java main 方法

    我有一个桌面应用程序 当有人按下按钮时 我希望它启动另一个执行类的 main 方法的 JVM 我的桌面应用程序已经依赖于包含具有我想要执行的 main 方法的类的 jar 目前我有以下代码 但是 我希望它们是一种更优雅的方法 Runtime
  • JavaFX TabPane 禁用按键切换选项卡

    我有一个Tab有一些内容 ScrollBar和别的 The ScrollBar has event handler for keys left and right 但如果我按下这些按钮Tabs被切换 因为TabPane还有一个密钥处理程序
  • 如何从 Android 服务获取应用程序上下文?

    我有一个正在运行并监听麦克风输入的 Android 服务 我希望它在满足特定条件时启动一项活动 为了创建意图 我需要应用程序上下文 我怎么才能得到它 Intent i new Intent ctx SONR class i addFlags
  • 如何在 JavaFX 中设置滚动窗格的单位增量?

    The 滚动条 http docs oracle com javafx 2 api javafx scene control ScrollBar htmlJavaFX 中的类包含一个用于设置单位增量的属性 这就是我所追求的 但是我找不到如何
  • Motorola Android 2.2 相机忽略 EXTRA_OUTPUT 参数

    我以编程方式打开相机来拍摄视频 我告诉相机使用如下代码将视频文件放置到指定位置 Intent intent new Intent MediaStore ACTION VIDEO CAPTURE File out new File sdcar
  • 在 Volley 中更新 UI 最有效的方法是什么

    最近我在 android 中使用 Volley 库 它工作得很好 但我想知道更新 UI 的最有效方法 我有一个包含所有 Volley 方法的 Utils 类 现在我传递了所有视图将作为参数更新 但我读到我可以在活动中实现侦听器 然后将它们作
  • JavaFX 多线程 - 连接线程不会更新 UI

    我正在尝试创建一个加载程序对话框 用户可以在其中知道程序正在加载所请求的内容并且程序正在按预期运行 但正因为如此 我需要join 解析器线程和之前继续主线程 这使得对话框空白 解析器任务 java public class ParserTa
  • R:连接到 Teradata 时 JDBC() 找不到 Java 驱动程序路径

    我正在尝试通过 RStudio 连接到 Teradata 但由于某种原因 JDBC 函数在识别 Java 驱动程序所在的路径时出现问题 请参阅下面的代码 library RODBC library RJDBC library rJava b
  • Apache James 学习资源 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 该计划是创建一个列表阿帕奇 詹姆斯 http james apache org学习资源 涉及从设置到使用
  • 为自定义 userdetailsservice 定义 bean

    我如何定义我的自定义UserDetailsServicebean 的方式使我的 spring mvc Web 应用程序能够使用我的底层 MySQL 数据库来检查用户和密码的身份验证 具体如下 我正在添加安全性spring petclinic
  • Java 8 Streams - 嵌套映射到列表

    firstlist stream map x gt return secondList stream map y gt return a string collect Collectors toList Output I need Get
  • 手写签名对比

    有谁知道java中一种将手写文本样本 例如签名 亲笔签名等 与一个或多个样本进行比较的方法 最好是开源的 你可以看看这个OCR小程序 http www heatonresearch com articles 42 page1 html
  • 信号量如何工作?

    信号量可以小于0吗 我的意思是 假设我有一个 N 3 的信号量 并且我调用 down 4 次 那么 N 将保持为 0 但一个进程将被阻塞 反之亦然 如果一开始我调用 N 可以大于 3 吗 因为在我看来 如果 N 可以高于 3 如果一开始我调
  • web.xml 过滤器如何工作?您可以将两个过滤器映射到所有页面 (/*) 并指定顺序吗?

    我想使用 Spring Security 它说将过滤器映射到 但我已经有一个映射到 的过滤器 即 Tuckee URLRewrite 过滤器 是否可以将两个过滤器映射到同一事物 并且有没有办法指定过滤器的调用顺序 是的你可以 servlet

随机推荐

  • sEnginScriptError Component is not found in path "components/..."

    小程序开发报错 sEnginScriptError Component is not found in path components cesh using by pages playlist playlist onAppRoute Err
  • 更换JDK版本不生效的问题解决方案

    1 首先检查环境变量是否修改完成 环境变量配置 PATH JAVA HOME bin JAVA HOME C Program Files Java jdk1 8 1 131 选择自己电脑上JDK的安装路径 2 输入 javac 检查JDK是
  • 力扣1342. 将数字变成 0 的操作次数(java+python)

    给你一个非负整数 num 请你返回将它变成 0 所需要的步数 如果当前数字是偶数 你需要把它除以 2 否则 减去 1 示例 1 输入 num 14 输出 6 解释 步骤 1 14 是偶数 除以 2 得到 7 步骤 2 7 是奇数 减 1 得
  • java Field.canAccess 和 Field.isAccessible

    如果 Field isAccessible 已经过时 则使用 Field canAccess 官方Api boolean Field canAccess Object obj Object obj an instance object of
  • Invalid character found in the request target.The valid characters are defined in RFC 7230 and RFC39

    背景 在将tomcat升级到7 0 81版后 发现系统的有些功能不能使用了 查询日志发现是有些地址直接被tomcat认为存在不合法字符 返回HTTP 400错误响应 错入信息如下 原因分析 经了解 这个问题是高版本tomcat中的新特性 就
  • Intellij IDEA 2019无法联网,无法下载插件问题解决办法

    Intellij IDEA 2019无法联网 无法下载插件问题解决办法 非法不多说直接上图 第一步 第二步 2019 2 之后没有这一步 第三步 记得重启哦 lt 完 gt
  • 代理服务器列表(20100116)

    经测试 今天发布的代理服务器全部都是免费可用的 211 152 11 30 80 69 119 28 234 8085 72 196 11 73 8085 222 66 116 110 8080 41 223 143 16 8080 122
  • 5.1劳动节,Happy May Day!(为什么要调休啊?)

    国际劳动节又称 五一国际劳动节 国际示威游行日 英语 International Workers Day May Day 是世界上80多个国家的全国性节日 定在每年的五月一日 它是全世界劳动人民共同拥有的节日 1889年7月 由恩格斯领导的
  • Unity中的宏定义

    有时候我们需要使用区分不同平台来实现不同的逻辑 这个时候就用到宏定义了 基本语法 if UNITY EDITOR WIN UNITY STANDALONE elif UNITY ANDROID else endif 宏定义可以直接写在类中
  • 基于STM32的正点原子LORA模块通信网络

    LoRa是semtech公司开发的一种低功耗局域网无线标准 其名称 LoRa 是远距离无线电 Long Range Radio 它最大特点就是在同样的功耗条件下比其他无线方式传播的距离更远 实现了低功耗和远距离的统一 它在同样的功耗下比传统
  • 解决“yarn : 无法加载文件 C:\Users\quber\AppData\Roaming\npm\yarn.ps1,因为在此系统上禁止运行脚本”问题

    我们在使用yarn命令的时候 可能会出现如下图所示的错误 出现此错误的原因是本地计算机上运行你编写的未签名脚本和来自其他用户的签名脚本 可以使用如下命令将计算机上的执行策略更改为RemoteSigned 执行命令set ExecutionP
  • 6-3 逆序数据建立链表(20 分)_头插法建链表

    6 3 逆序数据建立链表 20 分 本题要求实现一个函数 按输入数据的逆序建立一个链表 函数接口定义 struct ListNode createlist 函数createlist利用scanf从输入中获取一系列正整数 当读到 1时表示输入
  • CMake常用命令(二) project命令

    文章目录 语法 作用 参数 举例 示例1 基础用法 示例2 VERSION用法 示例3 DESCRIPTION用法 示例4 HOMEPAGE URL用法 示例4 LANGUAGES用法 更多细节 参考 语法 project
  • 【Python库系列】超详细的NumPy入门

    目录 一 数组的创建 1 1 创建数组 1 2 数组的性质 二 数组的存载 2 1 numpy 自身的 npy 格式 2 2 文本 txt 格式 2 3 文本 csv 格式 三 数组的获取 3 1 正规索引 3 2 布尔索引 3 3 花式索
  • 多类隶属度的模糊支持向量机(FSVM)

    模糊支持向量机 FSVM 多类隶属度分析 1 传统FSVM 隶属度分析 2 改进的FSVM 隶属度分析 3 FSVM多类隶属度分析 4 评估分类效果 不定期遴选一些高质量期刊论文 进行介绍 内容不详加解释 重在学习文章的思路 模糊隶属度来描
  • EMC基础知识

    一 概述 传导实验为测试被测设备对外界的骚扰的强度的实验 测试的频谱为150K 30MHz 充分理解实验数据 对设计整改会起到关键性作用 以下对实验波形及数据 设计整改方向进行简要说明 二 测试频域图解读 以上2张波形 是测试中经常看到的2
  • 如何用vue实现文本的编辑

    文章数据是通过后台返回html数据 之后前端通过遍历生成一个大的html 然后渲染到页面上 我如果我想要给这个文章新添加一个编辑功能应该怎么实现 文章内容不便展示 只通过文字描述完成本功能 一般的话 编辑文本可以才用富文本编辑器或者采用wo
  • java 基础重学(二)-基础知识

    1 基本数据类型 2 自动拆装箱 3 String 4 关键字 5 集合类 6 枚举 7 IO 8 反射 9 动态代理 10 序列化 11 注解 12 JMS 13 JMX 14 泛型 15 单元测试 16 正则表达式 17 异常 18 时
  • 2021年最推荐的十大进销存管理软件排名

    2021年过去一半了 相信不少商家都要准备开始在忙着盘点自己库存 对账 整理数据报表 有的人选对了店铺管理软件 只需动动手指就能快速搞定自己需要的数据 然而有的人每天晚上加班加点计算盘点 到头来数据还是一团糟 只因为没有选对店铺管理软件或者
  • RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)

    文章目录 RabbitMQ第三个实操小案例 发布者 订阅者 Publish Subscribe 广播交换器 FanoutExchange 写法一 配置类配置方式 写法二 注解方式 RabbitListener RabbitMQ第三个实操小案