一张图进阶 RocketMQ - 整体架构

2023-05-16

前 言

三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片链接,关于 RocketMQ 你只需要记住这张图!如果你第一次看到这个系列,墙裂建议你打开链接。觉得不错的话,记得点赞关注哦。

本文是《一张图进阶 RocketMQ》系列的第 1 篇,今天的内容主要分为两个部分:

  • 整体架构:会从大家熟悉的“生产者-消费者模式”逐步推出 RocketMQ 完整架构,只需要记住一张完整的架构图即可。
  • 消息收发示例:通过 Docker 部署 RocketMQ,并用简单的示例串起 RocketMQ 消息收发流程。

视频同步更新,欢迎围观 一张图进阶 ROcketMQ -整体架构 (视频版),记得调整清晰度哦~

视频同步上线,轻轻涨姿势~

整体架构

什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?

别急,先来回顾一下 “生产者-消费者模式” 这个老朋友。简单来说,这个模型是由两类线程和一个队列构成:

  • 生产者线程:生产产品,并把产品放到队列里。
  • 消费者线程:从队列里面获取产品,并消费。

有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

这意味着什么呢,生产者和消费者之间实现解藕异步。这就老厉害了,你未来工作学习中会不断遇到这些概念,但是越看越看不懂。我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。

具体 “生产者-消费者模式” 怎么实现,想必各位小学都学过了,我们来看看这个模式还有什么问题吧。最大的问题就是我们小学学的 “生产者-消费者模式” 是个单机版的,只能自嗨。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,然后我再从外卖队列里面去取,一顿操作猛如虎呀!于是就有了进化版,我们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式消息队列了。

生产者生产的消息通过网络传递给队列存储,消费者通过网络从队列获取消息。但是还有问题,消息可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧。于是我们就需要区分不同类型消息,相同类型的消息称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组消费者组

但还是有问题呀,小区那么大,一个队列放不下。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt。于是物业在东南西北门各设了一个外卖快递放置点。也就是我们有多个队列,组成 队列集群

可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么知道送到哪里去,我又怎么知道去哪里取?很简单,导航呀。我们把导航的信息称为路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。RocketMQ 为这些路由信息的设置了管理员 NameServer,当然 NameServer 也可以有很多个,组成 NameServer 集群。

到这里,你就应该知道 RocketMQ 里面都穿了什么啦。包括了生产者(Producer),消费者(Consumer),NameServer 以及队列本身(Broker)。Broker 是代理的意思,负责队列的存取等操作,我们可以把 Broker 理解为队列本身。

  • NameServer:我们可以同时部署很多台 NameServer 服务器,并且这些服务器是无状态的,节点之间无任何信息同步。
    NameServer 起来后监听 端口,等待 Broker、Producer、Consumer 连上来,NameServer 是集群元数据管理中心。

  • Broker:Broker 启动,跟所有的 NameServer 保持长连接,每 30s 发送一次发送心跳包(像心跳一样持续稳定的发送请求)。心跳包中包含当前 Broker 信息 ( IP+ 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

    我们可以同时部署多个 Master Broker和多个 Slave Broker,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 为 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。(可以暂时忽略 Broker 的主从角色)

  • Topic:收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

  • Producer:Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,采用轮询策略从选择一个队列,然后与队列所在的 Broker 建立长连接,并向 Broker 发消息。

  • Consumer:Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

    我们刚刚提到骑手不止一个,取外卖快递的也不止我一个,所以会有生产者组和消费者组的概念。这里需要补充说明一下,消息分为集群消息和广播消息:

    • 集群消息:一个 Topic 的一条消息,一个消费者组只能有一个消费者实例消费。例如,同样是外卖 Topic,一份外卖,我们整个小区也只有一个人消费,就是集群消费。

    • 广播消息:一个 Topic 的一条消息,一个消费者组所有消费者实例都会消费。例如,如果是因为疫情,政府发放食品,那我们小区每个人都会消费,就是广播消费。

消息收发示例

RocketMQ 部署

刚刚我们了解 RocketMQ 整体架构,那怎么样通过 RocketMQ 收发消息呢?需要先通过 Docker 部署一套 RocketMQ:

如果你没有安装 Docker,可以根据菜鸟教程 MacOS Docker 安装/Windows Docker 安装 进行安装。然后,通过 docker-compose 部署 RocketMQ:

  • 克隆 docker-middleware 仓库,打开 middlewares 目录;
  • 修改./conf/broker.conf中的brokerIP1 参数为本机 IP;
  • 进入docker-compose.yml文件所在路径,执行docker-compose up命令即可;

注意:如果你现在不了解 Docker 不重要,只需要按照步骤部署好 RocketMQ 即可,并不会阻碍我们理解 RocketMQ 相关内容。

在这里插入图片描述

部署完成后我们就可以在 Docker Dashboard 中看到 RocketMQ 相关容器,包括 Broker、NameServer 及 Console(RocketMQ 控制台),到这里我们就可以使用部署的 RocketMQ 收发消息了。

RocketMQ 已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 RocketMQ 的 “Hello World”。

消息发送

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("Topic1","Tag", "Key",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
      	// 通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
  • 首先,实例化一个生产者 producer,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。
  • 然后 producer 得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先建立通信连接等。
  • producer 已经准备好了,那得准备好要发的内容,把 “Hello World” 发送到 Topic1。
  • 内容准备好,那 producer 就可以把消息发送出去了。producer 怎么知道 Broker 地址呢?他就会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。
  • 生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。

消息接收

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
    	// 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
    	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        erbconsumerijun.subscribe("sancijun", "*");
    	// 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
              List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
	}
}
  • 首先,实例化一个消费者 consumer,告诉它 NameServer 的地址,这样消费者才能从 NameServer 获取路由信息。
  • 然后这个消费者需要知道自己可以消费哪些 Topic 的消息,也就是每个消费者需要订阅一个或多个 Topic。
  • 消费者也需要做一些初始化,业务本身并没有理会怎么从 Broker 拉取消息,这些都是消费者默默无闻的奉献。所以,我们需要启动消费者,消费者会从 NameServer 拉取路由信息,并不断从 Broker 拉取消息。拉取回来的消息提供给业务定义的 MessageListener。
  • 消息拉取回来后,消费者需要怎么处理呢?每个消费者都不一样(业务本身决定),由我们业务定义的 MessageListener 处理。处理完之后,消费者也需要确认收货,就是告诉 Broker 消费成功了。

以上就是本文的全部内容,本文没有堆砌太多无意义的概念,没有讲什么削峰解耦,异步通信。这些内容网上也很多,看了和没看没什么两样。

最后,看懂的点赞,没看懂的收藏。来都来了,交个朋友,遇到什么问题都可以和三此君分享呀。

参考文献

  • RocketMQ 官方文档

  • RocketMQ 源码

  • 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.

  • 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.

  • 杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.

转载请注明出处

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一张图进阶 RocketMQ - 整体架构 的相关文章

  • 程序员经典语录

    程序员编程语录 1 一个好的程序员是那种过单行线马路都要往两边看的人 xff08 Doug Linder xff09 2 程序有问题时不要担心 如果所有东西都没问题 xff0c 你就失业了 xff08 软件工程的Mosher定律 xff09
  • 使用fastboot命令刷机流程详解

    一 Fastboot是什么 1 1 首先介绍Recovery模式 卡刷 在系统进行定制时 xff0c 编译系统会编译出一份ZIP的压缩包 xff0c 里面是一些系统分区镜像 xff0c 提供给客户进行手动升级 恢复系统 需要提前将压缩包内置
  • 【谷歌插件】谷歌插件制作

    文章目录 谷歌浏览器插件制作教程实现步骤成功示例问题未封装的扩展程序并非来自 Chrome 网上应用商店 谷歌浏览器插件制作 教程 教程1 xff1a https blog csdn net github 35631540 article
  • 一个刚毕业大学生的四个月苦逼程序员经历

    先来一个自我介绍 大学时排名老三 就暂且叫老三吧 xff0c 毕业于河南的一个还算可以的二本院校 xff0c 专业 地球信息科学与技术 首先介绍一下我的专业 xff0c 听着名字很高大上 xff0c 其实 xff0c 我们都叫他四不像专业
  • JS中的require、import、default、export

    刚开始学的时候经常弄混总结一下 xff1a 懒人 xff1a 1 require xff08 导入 xff09 是Commonjs的规范与module exports xff08 导出 xff09 搭配使用 2 import xff08 导
  • Ubuntu安装python3

    sudo apt get install python3 安装python3 xff0c 安装完之后系统默认还是python2 xff0c 要删除python link文件 sudo rm rf usr bin python 然后建立新连接
  • ubuntu安装shutter出现E:无法修正错误

    使用Ubuntu16 04安装shutter时出现如下错误 通过换源可以解决
  • Ubuntu不能访问Windows分区

    将Windows的快速启动关闭即可解决次问题 在电脑中安装了双系统 xff0c 但有时候在Ubuntu中访问Windows分区会出现如下错误 xff1a 以前出现过这种错误 xff0c 是因为windows系统没有完全关闭 xff0c 当时
  • Ubuntu和Windows双系统时间不对的解决办法

    在使用一系统再切换到另一个系统之后 xff0c 系统时间好像是停留在上次关闭该系统的时间 在网上的解决办法通常是 xff1a sudo gedit etc default rcS xff0c 将UTC 61 yes改成UTC 61 no 但
  • Ubuntu出现依赖关系问题 - 仍未被配置问题

    安装软件包时候出现如下错误 xff1a 但这并不是依赖问题 xff0c 使用sudo apt get f install 无法解决 其实问题是因为这六个软件包没有被完全安装或卸载 在安装其他软件的时候会出现 xff1a 就是指这六个软件 使
  • 熬夜总结!最全的Pycharm常用快捷键大全!

    版权声明 xff1a 本文为博主原创文章 xff0c 遵循 CC 4 0 BY SA 版权协议 xff0c 转载请附上原文出处链接和本声明 本文链接 xff1a https blog csdn net momoda118 article d
  • iOS底层-对象里都有什么

    前言 上篇文章说了iOS中alloc方法是怎么创建对象的 xff0c 以及对象的本质是结构体 接下来继续探究对象的内存分布 xff0c 以及对象的isa是个什么样的结构体 xff0c 存储了哪些信息 对象内存分布 已知系统给对象分配内存是1
  • iOS底层-类的三顾茅庐(一)

    前言 了解完对象的底层 xff0c 知道isa指向的是类对象 那么类 xff08 Class xff09 的本质究竟是什么 xff1f 本文顺序isa的指向 xff0c 探索类的继承链 xff0c 和类对象的结构 xff0c 并且尝试获取方
  • iOS底层-alloc方法之旅

    前言 通过汇编调试和源码分析 xff0c 介绍iOS开发当中alloc方法到底做了什么 追踪 alloc 实例化一个对象往往是通过 xxx alloc init 那么alloc和init的区别是什么 xff1f 将两个方法分开调用 xff0
  • iOS底层-类的三顾茅庐(二)

    前言 上篇文章分析了objc class里存储数据的bits xff0c 了解到方法和属性的存储的位置class rw t xff08 以下简称rw xff09 本文将继续研究rw里包含的其他内容 类数据的存储 书接上文 xff0c rw结
  • iOS底层-类的三顾茅庐(三)

    前言 上文讲解完了类对象的结构体objc class用来存储类信息的成员bits xff0c 整个结构还剩下方法的缓存cache xff0c 放在压轴来讲解 简化版 struct objc class objc object 类对象指针 x
  • iOS底层-消息发送机制

    前言 通过对类的缓存探索了解到方法缓存在类对象的成员cache中 xff0c 而缓存的目的是为了方法调用的时候能更快的进行响应 缓存的时候 xff0c cache t结构体用到insert方法进行插入的 xff0c 那么本次就探索怎么读取
  • iOS底层-消息的转发

    前言 上篇文章介绍了方法调用的本质是消息发送 那如果经过查找后 xff0c 没有找到方法 xff0c 系统会怎么处理 xff1f 这就是本文接下来介绍的方法的动态决议和消息转发 动态决议 当方法查找一直查到父类为nil之后 xff0c 有i
  • 指针地址+1的理解

    指针向后移动一个单位 xff0c 如果是char指针 xff0c 就是1 xff0c 如果是int指针 xff0c 就是4 xff0c 如果是数组 xff0c 还要看是哪一维的下标 xff0c 要加上相应的维 include lt stdi
  • ConcurrentHashMap、synchronized与线程安全

    最近做的项目中遇到一个问题 xff1a 明明用了ConcurrentHashMap xff0c 可是始终线程不安全 除去项目中的业务逻辑 xff0c 简化后的代码如下 xff1a public class Test40 public sta

随机推荐

  • Spring MVC集成slf4j-logback

    1 Spring MVC集成slf4j log4j 关于slf4j和log4j的相关介绍和用法 xff0c 网上有很多文章可供参考 xff0c 但是关于logback的 xff0c 尤其是Spring MVC集成logback的 xff0c
  • 安装spinningup填坑ERROR: Could not build wheels for mpi4py which use PEP 517

    深度强化学习教程 xff1a Spinning Up项目中文版 Spinning Up 文档 ERROR Failed building wheel for mpi4py Failed to build mpi4py ERROR Could
  • Spring Bean 创建过程

    0 通常 xff0c 无论是DispatcherServlet ContextLoaderListener还是ClassPathXmlApplicationContext xff0c 首次实例化bean的入口并不是在每次调用getBean的
  • MySQL DataSource 性能对比(2015-8-19)

    1 本地性能测试耗时 xff08 一 xff09 共同条件 xff1a 测试程序与数据库在同一台主机上 xff0c 各DataSource均采用默认配置 xff0c 每个线程循环1000次 xff0c 查询语句为select from ta
  • MySQL 乐观锁 简例

    乐观锁与悲观锁不同的是 xff0c 它是一种逻辑上的锁 xff0c 而不需要数据库提供锁机制来支持 当数据很重要 xff0c 回滚或重试一次需要很大的开销时 xff0c 需要保证操作的ACID性质 xff0c 此时应该采用悲观锁 而当数据对
  • HTML5 Canvas 初步:字符串,路径,背景,图片

    HTML5中新增了 lt canvas gt 画布标签 xff0c 通过它 xff0c 可以使用JavaScript在网页中绘制图像 lt canvas gt 标签在网页中得到的是一个矩形空白区域 xff0c 可以通过width和heigh
  • CSS 伪类与伪元素

    CSS的元素选择器除了根据id xff08 xff09 class xff08 xff09 属性 xff08 xff09 选取元素以外 xff0c 还有很重要的一类 xff0c 就是根据元素的特殊状态来选取元素 它们就是伪类和伪元素 跟id
  • CSS3 动画效果总结

    CSS3添加了几个动画效果的属性 xff0c 通过设置这些属性 xff0c 可以做出一些简单的动画效果而不需要再去借助JavaScript CSS3动画的属性主要分为三类 xff1a transform transition以及animat
  • Javassist学习总结

    要想将编译时不存在的类在运行时动态创建并加载 xff0c 通常有两种策略 xff1a 1 动态编译 2 动态生成二进制字节码 xff08 class xff09 对于第二种策略 xff0c 实际上已经有诸多比较成熟的开源项目提供支持 xff
  • viewstub学习笔记

    当需要在运行时动态改变布局的情况下 xff0c 使用viewstub来进行动态的布局架构是逻辑简单控制灵活的 xff0c 并且相比于设置view gone来说viewstub更加的轻量化 xff0c 只有当调用了viewstub infla
  • 程序员的酸甜苦辣——告别Coding

    程序员的酸甜苦辣 告别Coding lt script language 61 34 javascript 34 type 61 34 text javascript 34 gt document title 61 34 程序员的酸甜苦辣
  • 基于参考注释的RNA-seq分析

    Step 1 构建参考序列索引 xff1a mkdirbti 在 stuXX 目录下新建文件夹 cd bti ln s database peixun2015 ref ath fa 在当前目录 xff0c 建立参考序列文件的超链接 bowt
  • maddpg 复现过程中遇到的问题

    最近在复现论文Multi Agent Actor Critic for Mixed Cooperative Competitive Environments https github com openai multiagent partic
  • "error while loading shared libraries: xxx.so.x" 错误的原因和解决办法

    一般我们在Linux下执行某些外部程序的时候可能会提示找不到共享库的错误 比如 tmux error while loading shared libraries libevent 1 4 so 2 cannot open shared o
  • SUN VirtualBox 的命令行启动/关闭方法简介

    我们可以使用VBxManager 命令行管理工具来查看当前的虚拟基设置 和状态 QHo 64 qhoferrari1k VBoxManage list vms 下面通过实例来说明如何从命令行启动和关闭VirtualBox 虚拟机的两种常用方
  • 运维日记011 - Ubuntu下更改初始用户名的方法

    运维日记011 Ubuntu下更改初始用户名的方法 引子 Ubuntu每两年一次发布的LTS版本都会倍加引人关注 xff0c 因为LTS版本有长达五年的支持周期 xff0c 对于我等不是太喜欢折腾尝鲜而是希望几年之内不用重装系统的用户颇有吸
  • SWIG学习记录(一)SWIG基础

    SWIG学习记录 1 什么是SWIG 1 2 特性1 2 1 预处理 2 SWIG安装3 SWIG基础介绍3 1 运行SWIG3 1 1 输入格式3 1 2 输出3 1 3 注释3 1 4 预编译3 1 5 SWIG指令3 1 6 解析器的
  • Cpp--重载全局的new和delete

    include lt iostream gt include lt process h gt include lt string h gt include lt stdio h gt define MAX SIZE 30000 char M
  • 国外知名音频库一站式资料和简介

    Speex Speex http blog csdn net xyz lmn article details 8013490 简介 Speex是一套主要针对语音的开源免费 xff0c 无专利保护的音频压缩格式 Speex工程着力于通过提供一
  • 一张图进阶 RocketMQ - 整体架构

    前 言 三此君看了好几本书 xff0c 看了很多遍源码整理的 一张图进阶 RocketMQ 图片链接 xff0c 关于 RocketMQ 你只需要记住这张图 xff01 如果你第一次看到这个系列 xff0c 墙裂建议你打开链接 觉得不错的话