RabbitMQ之交换机类型

2023-11-19

一、交换机类型

在 RabbitMQ 中,交换机主要用来将生产者生产出来的消息,传送到对应的队列中,即交换机是一个消息传送的媒介,其英文被称为 exchange 。交换机在 RabbitMQ 中起着承上启下的作用。

交换机主要有四种类型:

  • direct: 直连
  • topic: 主题
  • fanout:广播
  • headers: 请求头

常用的只有前面三种:direct、topic、fanout

二、direct 交换机

所有发送到 direct 交换机的消息都被转发到RouteKey中指定的队列 queue。RouteKey必须完全匹配才会被队列接收。

RabbitMQ自带的Exchange:default Exchange就是一个 direct 类型的交换机。当生产端发送消息没有指定交换机的名称时,使用的就是该交换机。当队列名称和 routingKey 一样时,也可以不用将exchange和queue进行绑定(binding)操作。

如下代码所示:

    // 生产端:发送消息
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "direct_queue";
        // 这里发布消息时,第一个参数为交换机名称,为空,则使用默认交换机 default exchange
        // 这里的第二个参数本应该是 routingKey,这里直接使用了队列名称,则就不需要再执行 exchange和queue通过 routingKey 的绑定操作了
        channel.basicPublish("", queueName, null, "didiok send a direct message".getBytes());
        channel.close();
        connection.close();
    }

    // 消费端:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "direct_queue";
        channel.queueDeclare(queueName, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println(new String(delivery.getBody()));
        }
    }

三、topic 交换机

所有发送到 topic 交换机 的消息被转发到所有 RouteKey匹配到的Queue上。这里的 routingKey 可以使用通配符进行模糊匹配:

  • 符号 # 匹配一个或多个词
  • 符号 * 匹配一个词

例如:“didiok.#” 能够匹配到 “didiok.hello”、“didiok.hello.world”、“didiok.hello.world.abc”等。而
“didiok.*”只会匹配到“didiok.hello”这类后面只带有一个词的。

代码如下:

    // 生产端:发送消息
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "topic_exchange";
        String routingKey1 = "user.aa.dev";
        String routingKey2 = "user.bb";
        String routingKey3 = "user.cc";

        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .contentEncoding("UTF-8")
                .headers(new HashMap<>())
                .deliveryMode(2)
                .build();

        String message = "didiok send a topic_message~~~";
        channel.basicPublish(exchangeName, routingKey1, props, message.getBytes());
        channel.basicPublish(exchangeName, routingKey2, props, message.getBytes());
        channel.basicPublish(exchangeName, routingKey3, props, message.getBytes());
    }

    // 消费端一:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现异常 **/
        String queueName = "topic_queue1";
        String exchangeName = "topic_exchange";
        String exchangeType = "topic";
        String routingKey = "user.#";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.exchangeDeclare(exchangeName, exchangeType, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        System.err.println("consumer1 starting...");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("消息内容:" + new String(delivery.getBody()) + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
        }
    }

    // 消费端二:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        /**** 以下对 交换机和队列的声明和绑定 操作,最好不要再代码中执行,建议在 rabbitMQ控制台中进行操作,因为代码可能重复执行,导致出现问题 **/
        String queueName = "topic_queue2 ";
        String exchangeName = "topic_exchange";
        String exchangeType = "topic";
        String routingKey = "user.*";
        channel.queueDeclare(queueName,false,false,false,null);
        channel.exchangeDeclare(exchangeName,exchangeType,false, false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName,true, consumer);
        System.err.println("consumer2 starting...");

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("消息内容:"+new String(delivery.getBody())+", routingKey:"+delivery.getEnvelope().getRoutingKey());
        }
    }
    

消费端一可以收到生产端发的三条消息,消费端二只能收到两条消息。

四、fanout 交换机

fanout 交换机 类似于广播,不走 routingKey,所以可以不设置 routingKey,设置了也没用。只需要简单的将队列绑定在交换机上。发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的,其次是 direct 交换机,topic 交换机最慢,因为需要根据匹配规则寻找队列(通配符找起来速度慢)。

示例代码如下:

    // 生产端:发送消息
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);


        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "fanout_exchange";
        String routingKey = "";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                 .headers(new HashMap<>())
                 .contentEncoding("UTF-8")
                 .deliveryMode(2)
                 .build();

        for(int i=0; i<10; i++){
            String msg = "发送消息的序号:"+i;
            channel.basicPublish(exchangeName, routingKey, props, msg.getBytes());
        }
        channel.close();
        connection.close();
    }

    // 消费端一:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "fanout_queue";
        String exchangeName = "fanout_exchange";
        String routingKey = "";

        channel.queueDeclare(queueName, false, false, false, null);
        channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        System.err.println("consumer1 starting...");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
        }
    }

    // 消费端二:消费消息
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("121.43.153.00");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String queueName = "fanout_queue2";
        String exchangeName = "fanout_exchange";
        String routingKey = "";

        channel.queueDeclare(queueName, false, false, false, null);
        channel.exchangeDeclare(exchangeName, "fanout", false,false,null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        System.err.println("consumer2 starting...");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            System.out.println("受到消息:"+new String(delivery.getBody())+",routingKey:"+delivery.getEnvelope().getRoutingKey());
        }
    }

因为交换机和队列进行了绑定,消费端一和消费端二都会收到生产端发送的消息。 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ之交换机类型 的相关文章

  • 使用RabbitMQ(Java客户端),有没有办法确定消费期间网络连接是否关闭?

    我在 RHEL 5 3 上使用 Java 客户端使用 RabbitMQ 我有 2 个节点 机器 Node1 正在使用 Java 帮助器类 QueueingConsumer 消费 Node2 上队列中的消息 QueueingConsumer
  • RPC 模型中的correlationId 和临时队列 - AMQP

    我正在读书RPC模型 http www rabbitmq com tutorials tutorial six java html在 AMQP 中使用 RabbitMQ 本教程创建了一个临时队列 并且还correlationId 临时队列是
  • 如何优雅地结束 spring @Schedule 任务?

    我正在尝试让 Spring Boot 服务优雅地结束 它有一个方法 Scheduled注解 该服务使用 spring data 作为数据库 使用 spring cloud stream 作为 RabbitMQ 在计划的方法结束之前 数据库和
  • spring中rabbitmq监听器的异常处理

    使用spring 我是rabbitmq的新手 我想知道我错在哪里 我编写了一个rabbitmq连接工厂和一个包含侦听器的侦听器容器 我还为侦听器容器提供了错误处理程序 但它似乎不起作用 我的春豆
  • 在使用 FromEventPattern 订阅之前捕获事件

    我正在使用 Rx 框架编写消息监听器 我面临的问题是 我正在使用的库使用一个消费者 每当消息到达时就会发布事件 我已经设法通过以下方式消费传入的消息Observable FromEventPattern但我对服务器中已有的消息有疑问 目前我
  • 组在 RabbitMQ 中接收消息,最好使用 Spring AMQP?

    我正在从服务 S 接收消息 该服务将每个单独的属性更改作为单独的消息发布到实体 一个人为的例子是这样的实体 Person id 123 name Something address 如果姓名和地址在同一交易中更新 则 S 将发布两条消息 P
  • 在点网核心应用程序中使用 RabbitMQ 跳过 MassTransit 中的队列

    我有三个项目 一个是Dot net core MVC 两个是API项目 MVC 正在调用一个 API 来获取用户详细信息 当询问用户详细信息时 我通过 MassTransit 向队列发送消息 我看到跳过队列 第三个项目中有消费者 即API项
  • 在 Red Hat 上安装 RabbitMQ - 错误的 Erlang 版本

    我正在尝试按照以下说明在 Red Hat Enterprise Linux 7 64 位工作站版本 的评估虚拟机上安装 RabbitMQhttps www rabbitmq com install rpm html https www ra
  • 如何在 Node js 中保持分叉的子进程处于活动状态

    我想创建一个像带有node的foreverjs一样运行的rabbitmq cli 它可以生成 child process 并使其在后台运行 并且可以随时与 child process 进行通信 我面临的问题是 当主 cli 程序退出时 ch
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • Django、RabbitMQ 和 Celery - 为什么在我更新开发中的 Django 代码后,Celery 会运行旧版本的任务?

    所以我有一个 Django 应用程序 它偶尔会向 Celery 发送任务以进行异步执行 我发现 当我在开发中处理代码时 Django 开发服务器知道如何自动检测代码何时发生更改 然后重新启动服务器 以便我可以看到我的更改 然而 我的应用程序
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • 更改 RabbitMQ 队列中的参数

    我有一个 RabbitMQ 队列 最初声明如下 var result channel QueueDeclare NewQueue true false false null 我正在尝试添加死信交换 因此我将代码更改为 channel Exc
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接

随机推荐

  • php随机生成密码函数

    随机生成密码函数 param int length 密码长度 return string function generate password length 8 密码字符集 可任意添加你需要的字符 abc abcdefghijklmnopq
  • python常用编译器和解释器的区别_详解python编译器和解释器的区别

    详解python编译器和解释器的区别 高级语言不能直接被机器所理解执行 所以都需要一个翻译的阶段 解释型语言用到的是解释器 编译型语言用到的是编译器 编译型语言通常的执行过程是 源代码 预处理器 编译器 目标代码 链接器 可执行程序 某种意
  • Linux(CentOS 或者 Ubuntu都可以)安装docker

    Linux CentOS 或者 Ubuntu都可以 安装docker 介绍下如何在Linux下面安装docker 安装方式如下 1 关闭防火墙 systemctl stop firewalld systemctl disable firew
  • 【Unity Optimize】使用对象池(Object Pooling)优化项目

    目录 1 对象池 Object Pooling 介绍 2 实现对象池脚本 3 使用对象池生成Cube 4 效果展示 5 Unity资源商店的对象池插件 1 对象池 Object Pooling 介绍 Unity中的对象池 Object Po
  • 单例模式(小小单例,一点也不小)

    小小单例 一点也不小 今天终于发现了原来单例模式还有这么多道道 单例模式解决了两个基本问题 全局访问和实例化控制 出自 大话设计模式 懒汉式单例模式 定义 要在第一次被引用时 才会将自己实例化 所以就被称为懒汉式单例模式 也就是我们常用的单
  • C 标准库 - 《assert.h》

    原文链接 https www runoob com cprogramming c standard library assert h html 简介 C 标准库的 assert h头文件提供了一个名为 assert 的宏 它可用于验证程序做
  • R: R版本更新及R包迁移(详细步骤)

    在安装R包的过程中 有时候会提醒R版本不够等情况 当需要更新R版本 又需要保证旧版本安装的R包可以完整迁移到新版本R时 可通过 installr 包实现 install packages installr library installr
  • python使用SMTP发送邮件

    SMTP是发送邮件的协议 Python内置对SMTP的支持 可以发送纯文本邮件 HTML邮件以及带附件的邮件 Python对SMTP支持有smtplib和email两个模块 email负责构造邮件 smtplib负责发送邮件 首先 我们来构
  • ARC105

    C Camels and Bridge 题意 一堆骆驼过桥 每个桥有承重和长度 问骆驼从头到尾的最近距离 假设这时候骆驼的过桥顺序已经安排好 每一个桥相当于一个限制条件 限制了 l r 的最近距离 也就是说 对于每一个骆驼 j 要保证 好了
  • OJ题目8--动态规划问题

    1 509 斐波那契数 力扣 LeetCode leetcode cn com 过去一直用递归法 但由于栈区空间的限制 当递归过深时容易发生栈溢出 int fib int n if n 0 return 0 else if n 1 retu
  • css按钮样式

    创建漂亮的 CSS 按钮的 10 个代码片段 IT程序狮子烨 1 个月前 如果你正在寻找一些高质量的 CSS 按钮的示例 那么这篇文章一定是你的 菜 在本文中 我们从 CodePen 上收集了 10 个独特的 CSS 按钮合集 并附有它们的
  • linux文件系统初始化过程(4)---加载initrd(中)

    一 目的 上文详细介绍了CPIO格式的initrd文件 本文从源代码角度分析加载并解析initrd文件的过程 initrd文件和linux内核一般存储在磁盘空间中 在系统启动阶段由bootload负责把磁盘上的内核和initrd加载到指定的
  • 苹果steam手机令牌未能连接服务器,steam手机令牌登不上怎么办(6种原因方法轻松解除)...

    引用自 平底锅揽件指南 随着 绝地求生 游戏的回温 最近芝士君收到了好多小伙伴关于 令牌 的问题 在这里为大家专门出一篇文章科普一下 好好看完这篇文章 以后妈妈再也不用担心我 绝地求生 游戏令牌出问题啦 在这里 芝士把大家遇到的问题总结为5
  • Java 中Arrays工具类的使用

    博主前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住也分享一下给大家 点击跳转到网站 介绍 java util Arrays类即为操作数组的工具类 包含了用来操作数组 比如排序和搜索 的各种算法 下面我用代码给大家演示一下
  • 十二. Kubernetes Pod 与 探针

    目录 一 Pod Pod 中的多容器协同 Pod 的组成与paush 重要 Pod 的生命周期 Pod状态与重启策略 静态Pod 二 探针 1 livenessProbe存活探针 2 readinessProbe就绪探针 3 startup
  • 页面滚动动画库,快看看

    本文属xxKarina原创 转载请注明 个人博客地址 https xxkarina github io 前端涉及的领域真的很广 但是粗略的划分的话 其实就是简单的三要素 html css js 当然 这些基本的Web前端技术是远远不足以让你
  • SpringBoot+mybatis+thymeleaf实现登录功能

    项目文件目录一栏 2 开始工作 先按照上图建立好相应的controller mapper等文件 接着进行一个配置 首先是application properties server port 8080 启动端口 加载Mybatis配置文件 m
  • 2023 年如何将您的应用提交到 App Store

    您夜以继日地工作来创建您的梦想应用程序 最后 是时候向全世界宣布您的应用程序了 但不知道如何将您的应用提交到 App Store 为您的商店获取现成的移动应用程序 将应用程序提交到 App Store 可能是一项复杂的任务 但在本指南的帮助
  • 揭开智能卡的面纱

    一 概述 ICC是Integrated Circuit Card的缩写 意思是集成电路卡 我们通常把它称为智能卡 Smart Card 智能卡应用广泛 它可以用来保存私人密码 银行账号 个人资料等 那么如何编写应用程序 从智能卡上读出或向其
  • RabbitMQ之交换机类型

    一 交换机类型 在 RabbitMQ 中 交换机主要用来将生产者生产出来的消息 传送到对应的队列中 即交换机是一个消息传送的媒介 其英文被称为 exchange 交换机在 RabbitMQ 中起着承上启下的作用 交换机主要有四种类型 dir