RabbitMQ消息队列实战(1)—— RabbitMQ的体系

2023-11-08

RabbitMQ是一个开源的消息代理和队列服务器,用来在不同的应用之间共享数据。1983年,被认为是RabbitMQ的雏形的Teknekron创建,首次提出了消息总线的概念。中间经历过数个阶段的发展,一直到2004年,AMQP(Advanced Message Queuing Protcol)协议开发完成,这是一款具有开放标准的通信协议,它允许任何人都可以执行这一标准,编码标准的任何人都可以和任何AMQP供应商提供的MQ服务器进行交互。直到2007年,RabbitMQ 1.0正式诞生,RabbitMQ开始在金融、电信领域大面积的使用。

RabbitMQ使得生产者和消费者真正做到了解耦。引入了代理服务器,可以实现高可用和弹性的横向拓展。通过增加服务器实例的方式,RabbitMQ可以做到每秒处理百万级别的消息。本文主要介绍RabbitMQ的逻辑体系结构,先是简单介绍其整个体系架构以及相关名词的介绍,然后介绍RabbitMQ三种消息路由的方式(因第四种Header模式基本不会使用,这里不包括在内),最后介绍其典型的应用场景。

一、RabbitMQ的体系架构

我们知道RabbitMQ最初被用来主要是做应用之间的解耦,所以它的整体的架构大概分为三个部分:生产者(Publisher),消费者(Consumer)和消息代理(Broker)。其中最核心的是消息代理,其内部结构稍显复杂,主要针对不同的应用场景,实现不同的消息处理逻辑。我们先来看下RabbitMQ的逻辑架构图:

下面,分别对这些名词进行解释:

Publisher:消息的生产者

Broker:消息代理,通常是一个服务实体。

VHost:Virtual Host,主要用来隔离一批交换机、队列,在每个虚拟主机中都可以创建自己的交换机、队列和绑定,虚拟主机之间互相不可见。

Connection:一个网络连接

Channel:信道,AMQP中引入了信道进行消息的传递。主要是因为操作系统中创建和销毁一个TCP连接代价非常昂贵,信道的引入就是为了复用一个TCP连接,通过多线程的方式来传递消息。通常情况下,一个信道对应一个线程。

Exchange:用来接受消息,并负责把消息传递给队列。

Queue:用来存放消息的容器。

Binding:用来关联交换机和队列的绑定,只有绑定了之后,交换机才知道向哪个队列发送消息。

二、RabbitMQ的路由策略

根据交换机的类型,RabbitMQ的的路由策略可以分为三种:直连默认(Direct)、主题模式(topic)、广播模式(fanout)。接下来,我们分别进行介绍。不过,在此之前,我们需要先了解下RabbitMQ的消息发布的命令,因为结合这个命令,我们才能购真正理解RabbitMQ这几种路由策略的原理,笔者以Java代码的API来解释这个命令:

 channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);

exchange:交换机的名称,在直连模式下为空。

routingKey:路由键,直连模式下是队列的名称,主题模式下是主题的名称,广播模式下为空。

props:发送消息时附带的一些附加参数

body:发送的消息转换成的二进制数组

2.1 直连模式

所谓的直连模式,就是直连交换机直接将消息路由到对应的队列中。在直连模式下,使用basicPublish在publish消息时,routingKey必须设置成目标队列的名称。而exchange可以设置成我们创建的直连交换机的名称,如果设置则使用RabbitMQ安装时默认的直连交换机。

要想使用直连模式路由消息,必须先创建直连交换机。当安装完成RabbitMQ后,会默认创建1个直连交换机(下图中圈出部分),如果我们在basicPublish时不指定交换机名称,会使用这个默认交换机(直连模式下):

除此之外,我们也可以创建支持直连模式的交换机,如下:

channel.exchangeDeclare("my-direct-exchange", "direct", true, false, null);

如上,我们创建了一个名称为my-direct-exchange的交换机,第二个参数指定了交换机的类型为direct类型。完成了交换机的创建之后,接下来创建一个队列my-queue用来接受交换机的消息:

 channel.queueDeclare("my-queue", true, false, false, null);

最后,创建一个binding,将交换机和队列关联起来:

 channel.queueBind("my-queue", "my-direct-exchange", "my-queue");

queueBind的第1个参数是要关联的队列名称,第2个参数是要关联的交换机名称,第三个参数是路由键,在直连模式下要填写队列名称。

完成了上述的过程之后,我们就创建了一套完整的直连交换机、队列和绑定,实现了以direct的方式路由消息。发布消息时,路由键填写队列名称:

channel.basicPublish("direct-exchange", "my-queue",...);

2.2 主题模式

RabbitMQ的第二种路由模式是topic模式,topic类型的交换机会根据发送消息的topic寻找使用binding绑定了该topic的的队列,然后将消息路由到该队列上。

需要注意的是,这种交换机和队列的关系是一对多的,也就是是说,多个队列可以通过同一个主题和同一个交换机绑定:

使用主题模式的路由策略,同样先创建topic类型的交换机:

 channel.exchangeDeclare("topic-exchange", "topic", true, false, null);

topic-exchange:交换机名称

topic:交换机类型,固定是topic

队列的创建,这里不再演示,因为队列没有direct、topic和fanout类型之分,我们还是沿用上面创建的队列my-queue来进行接下来的说明。与direct路由策略的bind不同,在为topic交换机和队列创建bind时要明确得指出主题:

channel.queueBind("my-queue", "topic-exchange", "fruit");

可以看到,在第3个参数路由键中,我们明确得指出了topic为fruit。

然后,在发布消息时,也要明确得指出消息的topic:

channel.basicPublish("topic-exchange", "fruit",...);

2.3 广播模式

RabbitMQ最后一种路由策略是fanout模式,该模式的路由原理如下图:

在该模式下,只要将队列和fanout交换机进行了绑定,那么向fanout交换机发布的所有的消息都会路由到所有绑定的队列中。与之前创建两种类型的交换机类似,创建fanout交换机时,需要指定交换机的类型为fanout:

channel.exchangeDeclare("fanout-exchange", "fanout", true, false, null);

在创建bind时不需要指定路由键:

channel.queueBind("my-queue", "fanout-exchange", "");

在发布消息时,同样不需要指定路由键,fanout 交换机会把消息发送到所有绑定它上面的队列中:

channel.basicPublish("fanout-exchange", "", ...);

2.4 关于Exchange、Queue和Bind之间的关系

可能刚刚接触RabbitMQ的童鞋,会被这三者的关系绕晕。我最开始学习RabbitMQ时,也是花了很长时间才把这三者之间的关系梳理清楚。笔者这里所说的关系,是指1个Exchange对应1个Queue,或者1个Exchange对应多个Queue等这种数量上的对应关系。针对这种关系,最终我发现了一个事实,总结如下:

(1)三者之间这种数量上的对应关系和RabbitMQ的路由策略以及Exchange的类型无关。只要保证一个bind对应在创建时绑定了一个exchange和一个queue即可。

(2)一个exchange可以和多个queue通过多个bind对象绑定,不管exchange是什么类型。

(3)一个queue可以和多个exchange通过多个bind对象绑定,不管exchange是什么类型。

三、RabbitMQ的应用场景

笔者,根据RabbitMQ的特性和使用目的,将RabbitMQ的应用场景归为了三类:

  • 将消费者和生产者解耦。

这是RabbitMQ被提出来最基本也是最初的目的。通过这种解耦,至少可以实现两种效果:

(1)原本需要轮询获取生产者消息的消费者代码,可以通过监听RabbitMQ队列实现有消息才处理,从而省去了轮询的过程。

(2)通过RabbitMQ主题交互机或者广播交换机,可以轻松实现当有新的消费者加入时,而不需要修改生产者代码。比如在充值时,我们可以发送一个充值事件,日志服务捕获充值事件后写入日志。如果后面有新的需求,比如积分服务,只需要新增积分服务即可,无需修改生产者代码。

  • 实现分布式事务

实现分布式事务。利用RabbitMQ代理可持久化,可做高可用以及消息的ack机制,可以实现分布式事务,比如saga模式,ebay的本地事务表。

  • 实现异步调用,提高性能。

在微服务架构的系统种,要实现某种业务往往需要调用多个其它服务的接口,如果采用同步调用的方式,需要在上一个接口调用完成之后再去调用下一个接口,性能势必不高。如果使用RabbitMQ作为代理进行异步调用,几乎可以实现和只调用一个接口同样的耗时。而且采用这种方式,还有一个好处,就是可以有选择地忽略完成该业务非必要地接口调用。

四、总结

本文主要介绍了RabbitMQ的体系结构、三种路由模式和应用场景,结合上文,现总结如下:

(1)RabbitMQ中通过vhost实现了数据的隔离,每个vhost下都有自己的exchange和queue,可以以vhost为单位对用户进行授权。

(2)RabbitMQ中有三种路由策略:

  • direct策略 —— 需要创建direct exchange,和queue绑定时指定路由键为队列名称,在发布消息时需要以目标队列的名称作为路由键。

  • topic策略 —— 需要创建topic exchange,和queue绑定时需要将明确的topic作为路由键,发布消息时需要将topic作为路由键。

  • fanout策略 —— 需要创建fanout exchange,和queue绑定时无需指定路由键,发布消息时无需指定路由键。

(3)一个exchange可以和多个queue通过多个bind对象绑定,不管exchange是什么类型。

(4)一个queue可以和多个exchange通过多个bind对象绑定,不管exchange是什么类型。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ消息队列实战(1)—— RabbitMQ的体系 的相关文章

  • Symfony Messenger 和 Mailer:如何添加绑定密钥?

    我有一个正在运行的 Symfony 4 4 项目 其中包含Messenger和rabbitMQ 我有一个带有 2 个队列的异步传输 transports https symfony com doc current messenger htm
  • 如何优雅地结束 spring @Schedule 任务?

    我正在尝试让 Spring Boot 服务优雅地结束 它有一个方法 Scheduled注解 该服务使用 spring data 作为数据库 使用 spring cloud stream 作为 RabbitMQ 在计划的方法结束之前 数据库和
  • RabbitMQ 用户在预先创建的队列上发布/订阅的权限

    我有一个用例 我需要创建一个用户并授予他仅在现有队列上发布 订阅的权限 这是一个示例 虚拟主机 mainvhost 对于所有用户都相同 在虚拟主机内 我有 A foo 和 Q bar 队列 用户 foo 只能发布 订阅到 Q foo 用户
  • amqp 与 amqplib - 哪个 Node.js amqp 客户端库更好?

    这些 amqp 客户端库之间有什么区别 哪一款最值得推荐 主要区别是什么 我会推荐amqp node https github com squaremo amqp node and bramqp https github com bakke
  • 为什么我无法使用 python 建立与rabbitMQ的连接?

    我正在学习如何使用rabbitMQ 我正在 MacBook 上运行rabbit MQ 服务器并尝试与 python 客户端连接 我按照安装说明进行操作here http www rabbitmq com install homebrew h
  • 在 RabbitMQ 主题交换中路由与模式不匹配的消息

    两个队列绑定到具有以下路由键的主题交换 队列 A 与路由键模式匹配绑定 foo队列 B 与路由键模式匹配绑定 bar 我想向此交换添加第三个队列 该队列接收的消息都不是foo消息也不bar消息 如果我用一个绑定这个队列 路由密钥 我自然会得
  • 与 RabbitMQ 相比,Amazon SQS 的性能较慢

    我想在我的 Web 应用程序中集成消息队列中间层 我测试了 Rabbitmq 和 Amazon SQS 但发现 Amazon SQS 速度很慢 我在 Amazon SQS 中每秒收到 80 个请求 而在 Rabbitmq 中每秒收到 200
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 当我为rabbitmq-management创建用户时,发生了错误

    当我为rabbitmq创建用户时 root localhost rabbitmqctl add user admin admin 发生错误 消息 Creating user admin Error undef crypto hash sha
  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • rabbitmq-erlang-client,使用 rebar 友好的 pkg,在开发环境上工作在 rebar 版本上失败

    我成功地将rabbitmq erlang client的rebar友好包用于一个简单的Hello World rebarized和OTP 兼容 应用程序 并且在开发环境中工作正常 我能够启动 erl 控制台并执行我的操作applicatio
  • 在 RabbitMQ 监听器中隐藏运行时异常

    在某些故意发生的情况下 我使用了一些异常来拒绝消息 但在控制台中显示了乍一看似乎不太正常的异常 如何在登录控制台 文件时隐藏该特定异常 我正在使用 spring boot 和默认记录器 public static class Undispa
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • Spring AMQP RabbitMQ 如何直接发送到Queue而不需要Exchange

    我正在使用 Spring AMQP 和 Rabbitmq 模板 如何直接将消息发送到队列而不使用Exchange 我该怎么做 我该怎么做 你不能 发布者不知道队列 只是交换和路由密钥 但是 所有队列都绑定到默认交换器 以队列名称作为其路由键
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方

随机推荐

  • 漫谈ELK在大数据运维中的应用

    圈子里关于大数据 云计算相关文章和讨论是越来越多 愈演愈烈 行业内企业也争前恐后 群雄逐鹿 而在大数据时代的运维挑站问题也就日渐突出 任重而道远了 本文旨在针对复杂的大数据运维系统推荐一把利器 达到抛砖引玉的效果 如果文中出现任何纰漏和错误
  • adfs服务器获取信息失败,在使用Fiddler或其他诊断工具时无法登陆到ADFS服务器

    在使用Fiddler或其他诊断工具时无法登陆到ADFS服务器 03 29 2016 2 分钟可看完 本文内容 问题描述 当使用Fiddler或其他诊断工具进行ADFS 排错时 用户从内部登录ADFS会反复弹窗要求进行身份验证 示例图如下 问
  • 教你一招永久去除WPS广告

    WPS的广告挺烦人的 一直以为无法去除 直到打开了配置工具 隐藏的够深的 首先打开WPS的配置工具 打开高级 选择其他选项 然后WPS广告的勾选项全部去掉
  • UbuntuServer虚拟机安装

    UbuntuServer虚拟机安装 目录 UbuntuServer虚拟机安装 环境 步骤 创建UbuntuServer虚拟机 UbuntuServer安装 环境 VMware Workstation Pro 15 1 0 Ubuntu Se
  • 【Spark NLP】第 2 章:自然语言基础

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • pycharm下载安装

    接下来安装pycharm 1 首先从网站下载pycharm 点击打开链接 链接为 http www jetbrains com pycharm download section windows 进入之后如下图 根据自己电脑的操作系统进行选择
  • Linux的介绍

    简介 主要介绍Linux的概念 Linux是一款操作系统 类似与Windows 开源 免费 安全 高校 稳定 非常擅长处理高并发 现在大多数企业级项目都是部署在Linux系统的服务器中运行 Linux的创始人是Linus 吉祥物是一只叫Tu
  • 深入理解Mysql索引底层数据结构与算法

    索引是帮助MySQL高效获取数据的排好序的数据结构 索引数据结构对比 二叉树 左边子节点的数据小于父节点数据 右边子节点的数据大于父节点数据 如果col2是索引 查找索引为89的行元素 那么只需要查找两次 就可以获取到行元素所在的磁盘指针地
  • [labtools 27-2269]no devices detected on target localhost问题解决

    小白刚学FPGA 以流水灯为例入门 在连接板子的时候遇到了这个问题 记录一下 板子型号 xc7z045ffg900 2 解决办法之一 按照table 1 11更改图1 3里34位置处拨码
  • Linux系统中 systemd-journaldCPU占用异常的解决方法

    一 待解决问题 先贴几张图 问题解决之前最头疼的问题 因打印日志的高占用 以致CPU占用高达96 已经无法满足日常使用 从图中可见systemd journald占用了1 4的CPU资源 注 我是用的是Deepin系统 二 解决办法 因为要
  • SpringBoot2.x 集成Hadoop3.0.3 实现HDFS文件系统管理

    任务要求 搭建SpringBoot 2 x 集成Hadoop3 0 3环境 实现Hadoop 重要组成部分HDFS 文件系统管理的封装 核心pom xml 文件
  • vSphere之vCLS

    vCLS vSphere Cluster Services 是在vSphere7 0U1引入的集群服务 它使用代理虚拟机维护集群服务的运行状况 当主机添加到集群时 将创建 vCLS 代理虚拟机 vCLS vm 每个 vSphere 集群中最
  • Dell工作站8T硬盘安装ubuntu 16.04

    Dell工作站8T硬盘安装ubuntu 16 04 MBR文件系统仅支撑2T磁盘 因此在2T以上磁盘上安装ubuntu时 如果想利用全部磁盘空间 需要采用GPT分区 文件系统 模型 这需要重新分区 制作Ubuntu 16 04启动U盘 一
  • js-语言基础进阶-变换按钮的实现

    作者 芝士小熊饼干 系列专栏 数据结构 蓝桥杯 算法 坚持天数 16天
  • 从零搭建Maven私有仓库

    前言 主要使用到的技术 linux docker sonatype nexus maven 1 nexus3介绍 世界上第一个也是唯一一个免费使用的通用工件存储库 2 使用docker安装nexus3 1 下载 使用命令 docker pu
  • 20201105枚举课后总结

    文章目录 枚举 210733 奶牛碑文 http wikioi cn problem 210733 题目描述 输入格式 输出格式 样例输入 样例输出 思路 1 2 代码 210792 分解质因数 http wikioi cn problem
  • java会话技术--03--Session覆盖问题

    java会话技术 03 Session覆盖问题 代码位置 https gitee com DanShenGuiZu learnDemo tree master sessionCookie learn 1 现象 同一域名 同一个服务 不同的端
  • 如何在 vue 项目中引入高德地图

    文章目录 前言 一 申请 地图api开发者key 二 在vue项目安装高德地图的包 三 使用 1 在自己的组件中引入高德地图类 2 编写初始化函数 3 添加插件 前言 相信在 web 开发中有不少项目都用到过地图 那么我们怎么在自己的项目中
  • 程序删除自己

    void DeleteApplicationSelf std string strFileName this char szCommandLine MAX PATH 10 设置本进程为实时执行 快速退出 SetPriorityClass G
  • RabbitMQ消息队列实战(1)—— RabbitMQ的体系

    RabbitMQ是一个开源的消息代理和队列服务器 用来在不同的应用之间共享数据 1983年 被认为是RabbitMQ的雏形的Teknekron创建 首次提出了消息总线的概念 中间经历过数个阶段的发展 一直到2004年 AMQP Advanc