RocketMQ吐血总结

2023-10-26

RocketMQ吐血总结

架构

概念模型

最基本的概念模型与扩展后段概念模型

存储模型

RocketMQ吐血总结

User Guide

  • RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在2016年底捐赠给Apache开源基金会成为孵化项目,经过不到一年时间正式成为了Apache顶级项目;早期阿里曾经基于ActiveMQ研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
  • NameServer可以部署多个,相互之间独立,其他角色同时向多个NameServer机器上报状态信息,从而达到热备份的目的。 NameServer本身是无状态的,也就是说NameServer中的Broker、Topic等状态信息不会持久存储,都是由各个角色定时上报并 存储到内存中的(NameServer支持配置参数的持久化,一般用不到)。
  • 为何不用ZooKeeper?ZooKeeper的功能很强大,包括自动Master选举等,RocketMQ的架构设计决定了它不需要进行Master选举, 用不到这些复杂的功能,只需要一个轻量级的元数据服务器就足够了。值得注意的是,NameServer并没有提供类似Zookeeper的watcher机制, 而是采用了每30s心跳机制。
  • 心跳机制
    • 单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息, 如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
    • Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳, 就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡(rebalance)。
    • 生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。 在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。
  • Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesrv发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。
  • 每个主题可设置队列个数,自动创建主题时默认4个,需要顺序消费的消息发往同一队列,比如同一订单号相关的几条需要顺序消费的消息发往同一队列, 顺序消费的特点的是,不会有两个消费者共同消费任一队列,且当消费者数量小于队列数时,消费者会消费多个队列。至于消息重复,在消 费端处理。RocketMQ 4.3+支持事务消息,可用于分布式事务场景(最终一致性)。
  • 关于queueNums:
    • 客户端自动创建,Math.min算法决定最多只会创建8个(BrokerConfig)队列,若要超过8个,可通过控制台创建/修改,Topic配置保存在store/config/topics.json
    • 消费负载均衡的最小粒度是队列,Consumer的数量应不大于队列数
    • 读写队列数(writeQueueNums/readQueueNums)是RocketMQ特有的概念,可通过console修改。当readQueueNums不等于writeQueueNums时,会有什么影响呢?
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
    if (topicRouteData != null) {
        for (QueueData data : topicRouteData.getQueueDatas()) {
            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
            data.setReadQueueNums(queueNums);
            data.setWriteQueueNums(queueNums);
        }
    }
  • Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上。Producer的发送机制保证消息尽量平均分布到 所有队列中,最终效果就是所有消息都平均落在每个Broker上。
  • RocketMQ的消息的存储是由ConsumeQueue和CommitLog配合来完成的,ConsumeQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。 如果某个消息只在CommitLog中有数据,而ConsumeQueue中没有,则消费者无法消费,RocketMQ的事务消息实现就利用了这一点。
    • CommitLog:是消息主体以及元数据的存储主体,对CommitLog建立一个ConsumeQueue,每个ConsumeQueue对应一个(概念模型中的)MessageQueue,所以只要有 CommitLog在,ConsumeQueue即使数据丢失,仍然可以恢复出来。
    • ConsumeQueue:是一个消息的逻辑队列,存储了这个Queue在CommitLog中的起始offset,log大小和MessageTag的hashCode。每个Topic下的每个Queue都有一个对应的 ConsumeQueue文件,例如Topic中有三个队列,每个队列中的消息索引都会有一个编号,编号从0开始,往上递增。并由此一个位点offset的概念,有了这个概念,就可以对 Consumer端的消费情况进行队列定义。
  • RocketMQ的高性能在于顺序写盘(CommitLog)、零拷贝和跳跃读(尽量命中PageCache),高可靠性在于刷盘和Master/Slave,另外NameServer 全部挂掉不影响已经运行的Broker,Producer,Consumer。
  • 发送消息负载均衡,且发送消息线程安全(可满足多个实例死循环发消息),集群消费模式下消费者端负载均衡,这些特性加上上述的高性能读写, 共同造就了RocketMQ的高并发读写能力。
  • 刷盘和主从同步均为异步(默认)时,broker进程挂掉(例如重启),消息依然不会丢失,因为broker shutdown时会执行persist。 当物理机器宕机时,才有消息丢失的风险。另外,master挂掉后,消费者从slave消费消息,但slave不能写消息。
  • RocketMQ具有很好动态伸缩能力(非顺序消息),伸缩性体现在Topic和Broker两个维度。
    • Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
    • Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后向Namesrv注册,Producer、Consumer通过Namesrv 发现新Broker,立即跟该Broker直连,收发消息。
  • Producer: 失败默认重试2次;sync/async;ProducerGroup,在事务消息机制中,如果发送消息的producer在还未commit/rollback前挂掉了,broker会在一段时间后回查ProducerGroup里的其他实例,确认消息应该commit/rollback
  • Consumer: DefaultPushConsumer/DefaultPullConsumer,push也是用pull实现的,采用的是长轮询方式;CLUSTERING模式下,一条消息只会被ConsumerGroup里的一个实例消费,但可以被多个不同的ConsumerGroup消费,BROADCASTING模式下,一条消息会被ConsumerGroup里的所有实例消费。
  • DefaultPushConsumer: Broker收到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(5s),然后在check。当一直没有新消息,第三次check时,等待时间超过suspendMaxTimeMills(15s),就返回空结果。在等待的过程中,Broker收到了新的消息后会直接调用notifyMessageArriving返回请求结果。“长轮询”的核心是,Broker端Hold住(挂起)客户端客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。长轮询方式的局限性,是在Hold住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
  • DefaultPullConsumer: 需要用户自己处理遍历MessageQueue、保存Offset,所以PullConsumer有更多的自主性和灵活性。
  • 对于集群模式的非顺序消息,消费失败默认重试16次,延迟等级为3~18。(messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h")
  • MQClientInstance是客户端各种类型的Consumer和Producer的底层类,由它与NameServer和Broker打交道。如果创建Consumer或Producer 类型的时候不手动指定instanceName,进程中只会有一个MQClientInstance对象,即当一个Java程序需要连接多个MQ集群时,必须手动指定不同的instanceName。需要一提的是,当消费者(不同jvm实例)都在同一台物理机上时,若指定instanceName,消费负载均衡将失效(每个实例都将消费所有消息)。另外,在一个jvm里模拟集群消费时,必须指定不同的instanceName,否则启动时会提示ConsumerGroup已存在。

More

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ吐血总结 的相关文章

  • Java:如何将ArrayList作为对象的实例变量?

    我正在开展一个班级项目 用 Java 构建一个小型 Connect4 游戏 我目前的想法是拥有一类 Columns 它具有一些整数 索引 最大长度 isFull 作为实例变量 以及一个 ArrayList 来接收上面的整数和每个玩家的表现
  • 知道 akka actor 何时完成

    有几个人和我一起从事一个项目 一直在试图找出解决这个问题的最佳方法 看起来这应该是经常需要的标准东西 但由于某种原因我们似乎无法得到正确的答案 如果我有一些工作要做 并且我向路由器抛出一堆消息 我如何知道所有工作何时完成 例如 如果我们正在
  • Servlet 包含 Tomcat 中的 HTTP 标头

    我有一个 servlet 它的请求调度程序包含另一个 servlet 包含的 servlet 设置了我想在包括小服务程序 因此 我在 include 方法中传入一个自定义 HTTPResponse 对象 该对象捕获来自 servlet 的所
  • 为什么要分离接口和实现?

    在生产代码中 我经常看到定义如下的类 public interface SomeComponent Some methods public class SomeComponentImpl implements SomeComponent S
  • 我的代码中出现 ArrayIndexOutOfBoundsException 的原因是什么?

    我正在 Java 中实现凸包的格雷厄姆扫描算法 我在运行代码时收到此错误 对于输入字符串 10 18 Exception in thread main java lang ArrayIndexOutOfBoundsException 0 a
  • 何时使用环境变量与系统属性?

    我想知道以下哪种方法是首选方法 我们可以将事情设置为APP HOME path to file export in profile或类似的东西 并将其访问为System getenv APP HOME 或者 也可以使用属性作为 DAPP H
  • Java - Servlet 的默认 contentType

    在servlet中 通常我们会指定一个contentType 然后我们就可以打印出html代码了 response setContentType text html PrintWriter out response getWriter 如果
  • 如果Jetty的密钥库中有多个证书,它如何选择?

    我们的系统中有一些代码用于自动将自签名证书生成到密钥库中 然后由 Jetty 使用 如果给定主机的密钥已经存在 那么什么也不会发生 但如果它不存在 我们会生成一个新密钥 如下所示 public void generateKey String
  • 如何设置 Eclipse 以停止发生异常的线路?

    如何设置 Eclipse 在发生异常时停止 我有一个 Eclipse 断点设置来在异常时中断 在下面的代码示例中 我遇到的问题是 Eclipse 尝试打开 Integer 源代码 有没有办法让调试器在我的代码示例中显示的位置中断 如果我向下
  • Java TreeMap时间复杂度-lowerKey

    时间复杂度是多少lowerKey Java实现中的操作TreeMap 我认为它是 log n 但我在文档中找不到它 更基本操作的复杂性已有详细记录 此实现提供了有保证的 log n 时间成本 containsKey 获取 放置和删除操作 顺
  • cipher.update在java中做什么?

    我正在实施 DES CBC 我很困惑什么cipher init cipher update and cipher dofinal做 我只是使用 init 来设置密钥dofinal得到结果 我不使用更新 那是对的吗 另外使用时结果有什么不同U
  • 需要在 java api 中的 Solr 搜索中搜索文本及其周围的几行

    我正在使用 solr 7 7 2 并且我使用 solrj 在 Solr 中编写了一个 Java 程序 该程序在一个巨大的文本文件中搜索单词 我使用以下代码来显示代表整个文本的搜索结果 SolrQuery params new SolrQue
  • 如何从java程序中编译.java文件[重复]

    这个问题在这里已经有答案了 可能的重复 从 Java 内部编译外部 java 文件 https stackoverflow com questions 10889186 compiling external java files from
  • 将 XML 转换为 Java 对象 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • int 到 long 赋值

    我一直在尝试这个 int 和 long 转换 我尝试分配一个int变量为along多变的 代码如下 public static void main String args int i 1024 long j i long k i i i i
  • 什么是样板代码、热点代码和热点?

    我知道这些术语是在性能实现 优化的背景下使用的 最近一直在研究这个问题 并尝试过搜索 但没有得到任何例子 清楚地阐述 描述这些概念以及在现实世界开发场景中实现这些问题 概念 有人可以彻底解释这些术语 示例场景以及可能使用这些概念和术语的地方
  • 如何使用 Java 11 和 JavaFX 11 运行 ControlsFX 示例应用程序

    ControlFX 网站 http fxexperience com controlsfx says 如果您想使用 ControlsFX 示例应用程序 只需 下载 ControlsFX 版本并在上运行以下命令 命令提示符 请务必将 替换为实
  • Activity 在 Android 上创建两次

    首先 我是 Android 开发新手 所以请耐心等待 我将从用户界面开始 我有一个按钮 一旦您点击它 就会启动一个活动以获取结果 public class GUIActivity extends Activity Override publ
  • struts2中如何访问url参数

    我正在做一个struts2项目 我在项目中创建了 url 并使用标签传递了参数 我的问题是如何读取操作中的参数 另外 如果执行相同的操作 我将能够将参数视为查询字符串 我问是因为我无法做到 而且我在其中一个教程中看到了它 通常 您将通过使用
  • 从 Web 服务器异步调用应用程序

    我有一个用 Spring 制作的 在 Tomcat 上运行的 Web 应用程序 在同一台机器上有一个普通的 Java 应用程序 我想通过从Web服务器调用Java应用程序来执行它 但我想让应用程序不会使用服务器的资源 它涉及分类器的训练 因

随机推荐

  • Hibernate参数校验报错:No validator'javax.validation.constraints.Size' validating type 'java.lang.Integer'.

    javax validation UnexpectedTypeException HV000030 No validator could be found for constraint javax validation constraint
  • Python网络爬虫学习笔记(三)正则表达式

    正则表达式 正则表达式是处理字符串的强大工具 它有自己特定的语法结构 有了它 实现字符串的检索 替换 匹配验证 1 实例引入 正则表达式匹配 也就是用一定的规则将特定的文本提取出来 开源中国提供了正则表达式测试工具 https tool o
  • 虚拟机升级glibc(libc), 导致段错误等问题

    由于确实glibc高版本 需要升级glibc 导致出现段错误等信息 只剩下pwd cd等命令可以执行 这个时候需要靠补全命令查询到原系统使用的libc 2 xx文件 然后使用sln 原系统的重新索引libc so 6文件 sln lib64
  • SOA是什么?

    写这样的blog很容易被人砸砖头 而且我现在在专心做BPEL的研究 http hongsoft iteye com admin blogs 287353 也没有必要现在趟这个混水 不过想想 还是有话要说 定义 SOA是一种做架构的范式 这个
  • FreeSwitch数据库

    Freeswitch数据库 一 ODBC DSN 1 概念 ODBC 开放数据库连接 Open Database Connectivity ODBC https baike baidu com item ODBC 是为解决异构数据库间的数据
  • 线性回归(两种方式代码实现)

    方式一 最小二乘法 正规方程 公式推导 其中 代码实现 1 导入库 import numpy as np from sklearn datasets import load boston boston load boston x bosto
  • 前端面试总结及建议

    最近 由于项目组刚成立不久 团队处于天地初开的混沌状态 人员配置不齐 急需一大股新鲜血液融入 为此 开启了一段时间与求职面试者的博弈之路 如今的IT大环境 似乎每个公司一年四季都处于招人状态 而同时又有一大批无论是离职还是在职人员期许找一个
  • Linux操作命令笔记

    Linux Linux的字母大小写 下载和卸载 软件更新 查看空间使用情况 当前目录所在的位置 查看文件中的内容 查看目录下的文件 重启 关机 移动文件 磁盘管理软件 修改权限 删除文件或文件夹 新建文件夹 移动一个文件夹 文件重命名 编译
  • CMake中define_property的使用

    CMake中的define property命令用于定义和记录自定义属性 其格式如下 define property
  • 轻量微调和推理stanford_alpca

    当前的Alpaca模型是在Self Instruct论文中使用的技术生成的52K条指令数据 从7B LLaMA模型微调而来 并进行了一些修改 A10 gpu显存 22G cu117 驱动470 103 01 absl py 1 4 0 ac
  • 图形化界面

    文章目录 一 引入图形化界面 二 关于Easyx的基本函数操作 2 1颜色配比函数 2 2EasyX的坐标 有负数区分 2 3窗口函数函数 三 关于Easyx的实际操作 代码实现 3 1颜色操作的代码实现原理 3 2坐标操作的代码实现原理
  • uview ,uniapp 的UI组件

    文档 https www uviewui com components picker html
  • 高手勿进!写给初中级程序员以及还在大学修炼的“准程序员”的成长秘籍

    1 不要畏惧英文 互联网上很多优秀的技术资源和社区的内容都是英文 如果畏惧英文 就没办法从中汲取富有营养的知识 也没办法跟上技术的发展潮流 那么技术能力就会很难再上一层 技术的眼界就会受限 所以需要去突破语言这关 A 从改变语言环境开始 你
  • 【最详细附源码】R语言4.3.0全新安装教程

    软件下载 软件 R语言 版本 4 3 0 语言 简体中文 大小 77 74M 安装环境 Win7及以上版本 64位操作系统 硬件要求 CPU 2 0GHz 内存 4G 或更高 下载通道 百度网盘丨64位下载链接 https pan baid
  • springcloud整合Hystrix,实现接口服务降级

    利用Hystrix对接口control层进行服务降级 新建子工程service03 作为测试Hystrix服务降级的微服务 pom xml
  • nginx 之 proxy_pass详解

    在nginx中配置proxy pass代理转发时 如果在proxy pass后面的url加 表示绝对根路径 如果没有 表示相对路径 把匹配的路径部分也给代理走 假设下面四种情况分别用 http 192 168 1 1 proxy test
  • 工作十年的程序员,总结的前端面试题!

    1 常用那几种浏览器测试 有哪些内核 Layout Engine 1 浏览器 IE Chrome FireFox Safari Opera 2 内核 Trident Gecko Presto Webkit 2 说下行内元素和块级元素的区别
  • 贝叶斯网络结构学习约束的帕累托(pareto)最优

    约束 无非要考虑两件事 一是算法的结果准确率 二是算法的运行速度 约束算法的帕累托 pareto 最优就是指在保证算法准确性不降的前提下 使得算法的运行速度达到最快 就实现了帕累托最优 在已知贝叶斯网络标准结构的情况下对贝叶斯网络结构进行学
  • 人工智能之数学(概率方面)

    我们经常使用的统计机器学习算法 或者是神经网络模型中 数学作为最基础的根基 融合了高等数学中的微分学 概率 线性代数 凸优化等方面 每一个方面深入后都是有很多的益处 但是本着先实用 在进行学习的原则 所以主要是理解相关数学符号 理解统计学习
  • RocketMQ吐血总结

    RocketMQ吐血总结 架构 概念模型 最基本的概念模型与扩展后段概念模型 存储模型 RocketMQ吐血总结 User Guide RocketMQ是一款分布式消息中间件 最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统 满足线