RocketMQ——NameServer和Broker

2023-05-16

RocketMQ——NameServer和Broker

文章目录

  • RocketMQ——NameServer和Broker
    • NameServer
      • NameServer功能
      • 为什么不用zookeeper?
    • Broker
      • Broker消息存储
      • Broker的HA


NameServer

NameServer功能

NameServer负责维护Producer和Consumer的配置信息、状态信息,并且协调各个角色的协同执行。通过NameServer各个角色可以了解到集群的整体信息,并且他们会定期向NameServer上报状态。
在 org.apache.rocketmq.namesrv.routeinfo 包下的RouteInfoManager类中,定义了许多变量,通过5个HashMap变量存储和维护集群的状态信息

    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  • BROKER_CHANNEL_EXPIRED_TIME定义了Broker向NameServer的汇报超时时长,默认是两分钟,如果超过两分钟则关闭Broker和NameServer连接通道,并将该Broker从brokerLiveTable中移除
  • HashMap<String/* topic */, List> topicQueueTable。该变量存储了所有的Topic,并以Topic的名字作为key,value是这个Topic下的消息列表,列表的长度即Master Broker的数量
public class QueueData implements Comparable<QueueData> {
    private String brokerName;//broker名
    private int readQueueNums;//读取queue数量
    private int writeQueueNums;//写入queue数量
    private int perm;
    private int topicSynFlag;//同步标识
}
  • HashMap<String/* brokerName */, BrokerData> brokerAddrTable。变量存储同一个Broker的数据信息。以brokerName作为key,BrokerData作为value。在BrokerData类中包含了cluster属性、brokerName属性和brokerAddrs属性。由于在一个集群中,同一个brokerName中可能包含多台机器(一个Master和多个Slave),因此在BrokerData中用HashMap存储这些机器的brokerId和地址
public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    }
  • HashMap<String/* clusterName /, Set<String/ brokerName */>> clusterAddrTable。这个变量存储了在集群下对应的brokerName有哪些
  • HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable。该变量存储每台broker机器对应的实时状态,通过上次更新时间戳(lastUpdateTimestamp)来计算Broker更新是否超时,判断该Broker是否失效。
class BrokerLiveInfo {
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;
    }
  • HashMap<String/* brokerAddr /, List/ Filter Server */> filterServerTable。该变量存储每台Broker关联的过滤服务器。

为什么不用zookeeper?

zookeeper是apache旗下用于分布式服务协调的开源软件,并且拥有选举机制,能够在master宕机时从slave中通过选举机制选出一台slave变成master。但是在NameServer的设计中,MasterBroker中没有一台拥有全部的Topic信息,消息分布平均,失去选举机制意义。其次,NameServer仅仅被用于存储集群的配置信息、元数据信息,不需要太复杂的功能,所以放弃重量级的zookeeper选择轻量级的NameServer。

Broker

Broker消息存储

在Broker中存在着两个角色:CommitLog和ConsumeQueue。

  • CommitLog:实际储存消息的物理文件
  • ConsumeQueue:消费队列,队列中存储的是待消费消息的地址偏移量,类似于索引

通过CommitLog和ConsumeQueue相互配合完成消息的存储。
在这里插入图片描述

消息的存储地址在配置文件中通过 storePathRootDir 进行配置。在存储路径下的存储目录结构如下

/home
└── rocketmq
    ├── store-a
    │   ├── abort
    │   ├── checkpoint
    │   ├── commitlog
    │   │   └── 00000000000000000000
    │   ├── config
    │   │   ├── consumerFilter.json
    │   │   ├── consumerFilter.json.bak
    │   │   ├── consumerOffset.json
    │   │   ├── consumerOffset.json.bak
    │   │   ├── delayOffset.json
    │   │   ├── delayOffset.json.bak
    │   │   ├── subscriptionGroup.json
    │   │   ├── topics.json
    │   │   └── topics.json.bak
    │   ├── consumequeue
    │   │   ├── OFFSET_MOVED_EVENT
    │   │   │   └── 0
    │   │   │       └── 00000000000000000000
    │   │   ├── OrderTest
    │   │   │   ├── 0
    │   │   │   │   └── 00000000000000000000
    │   │   │   ├── 1
    │   │   │   │   └── 00000000000000000000
    │   │   │   ├── 2
    │   │   │   │   └── 00000000000000000000
    │   │   │   └── 3
    │   │   │       └── 00000000000000000000
    │   ├── index
    │   │   └── 20190722164917530
    │   └── lock

可以看到在存储目录中有Commit Log、ConsumeQueue、index等, ConsumeQueue 目录中存储了所有的Topic信息,以及index索引文件。CommitLog采用顺序写、随机读的方式加快写入效率,并且由于ConsumeQueue中仅存储20字节的偏移量、Tags等,所以能够存进内存当中,读取速度也很快。
从生产者读取到消息后会将消息存储到本地磁盘,RocketMQ提供两个存储方式:同步刷盘和异步刷盘。

  • 同步刷盘:同步刷盘方式在Broker读取到生产者发送的消息后立即从内存写入磁盘,并且返回成功状态,生产者接收到成功状态时消息已经被写入磁盘。
  • 异步刷盘:异步刷盘的方式将收到的信息在内存中存储,并且返回成功状态,生产者收到成功状态时消息还在内存中并未写入磁盘。当内存中消息堆叠到阈值时,批量将消息写入磁盘。

Broker的HA

Broker的集群有master和slave两种角色,通过在配置文件中配置实现。Broker集群中master和slave区别如下:

  • master配置文件brokerId为0;slave为其他值
  • master支持读和写;slave只能读
  • 生产者只能和master连接写消息;消费者可以连接master也可以链接slave读消息
    RocketMQ并没有自动将slave转成master的机制,因此当master资源不足或失效时,需要手动修改配置文件,利用新的配置文件重新启动Broker

在Broker组中,需要将master中的消息复制各个slave中以达到消息同步的目的。RocketMQ提供两种消息复制的机制:同步复制和异步复制。

  • 同步复制:等到master和slave均写入成功才返回成功状态。该方式如果master出现故障数据易恢复,但是吞吐量低
  • 异步复制:只要master写入成功就返回成功状态。该方式延迟低、吞吐量高,但是不知道slave是否成功写入master故障消息就可能会丢失
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ——NameServer和Broker 的相关文章

  • Rocketmq-- RocketMQ4.X基础介绍

    一 阿里开源消息队列 RocketMQ4 x介绍和新概念讲解 Apache RocketMQ作为阿里开源的一款高性能 高吞吐量的分布式消息中间件 1 1 特点 支持Broker和Consumer端消息过滤 支持发布订阅模型 和点对点 支持拉
  • linux下rocketmq安装-单机

    1 环境准备 jdk 这里用的jdk8 maven 3 6 1 这里说一下maven安装 下载maven 3 6 1 root devops 02 wget https archive apache org dist maven maven
  • 【Linux系统安装RocketMQ并整合到SpringBoot项目】

    Linux系统安装RocketMQ并整合到SpringBoot项目 一 基本概念 1 1 NameServer 1 2 Broker 1 3 Message 1 3 Topic 1 4 Tag 1 5 Queue 1 6 MessageId
  • rocketmq安装、启动

    1 下载 gt wget http mirror bit edu cn apache rocketmq 4 4 0 rocketmq all 4 4 0 source release zip gt unzip rocketmq all 4
  • Docker实战:docker compose 搭建Rocketmq

    1 配置文件准备 1 1 新建目录 home docker data rocketmq conf mkdir home docker data rocketmq conf 1 2 在上面目录下新建文件broker conf文件 内容如下 b
  • RocketMQ源码(十三)—消费者DefaultMQPushConsumer启动主要流程源码

    此前我们学习了Broker和Producer的启动源码 以及Producer发送消息源码和Broker接收存储消息的源码 现在 我们来学习Consumer的启动以及消费消息的源码 Consumer的启动源码和Producer的启动源码还是有
  • RocektMQ社区"每周直播分享第7期"如约而至

    各位同学 RocektMQ社区 每周直播分享第7期 如约而至 分享题目 RocketMQ消息消费概述 直播方式 钉钉群直播方式 群号 21791227 分享时间 2019 01 17 20 00 21 30 本周四 分享讲师 费红健 内容简
  • RocketMQ-如何保证顺序消息

    1 简介 实际开发中会有以下场景 需要保证一组消息的生产顺序与消费顺序相同 例如 监听数据库表单条数据的的多次修改 需要保证监听者最终得到的消息顺序和数据库表对单条数据的修改顺序一样 网购平台创建订单的过程一般都是异步实现 订单创建和支付流
  • rocketmq顺序发送消息

    1 概念 严格顺序消息模式下 消费者收到的所有消息均是有顺序的 消息有序指的是可以按照消息的发送顺序来消费 FIFO RocketMQ可以严格的保证消息有序 可以分为分区有序或者全局有序 顺序消费的原理解析 在默认的情况下消息发送会采取Ro
  • Flink RocketMQ Connector实现

    Flink内置了很多Connector 可以满足大部分场景 但是还是有一些场景无法满足 比如RocketMQ 需要消费RocketMQ的消息 需要自定时Source 一 自定义FlinkRocketMQConsumer 参考FlinkKaf
  • 7 SpringBoot整合RocketMQ发送单向消息

    发送单向消息是指producer向 broker 发送消息 执行 API 时直接返回 不等待broker 服务器的结果 这种方式主要用在不特别关心发送结果的场景 举例 日志发送 RocketMQTemplate给我们提供了sendOneWa
  • 【RocketMQ】NameServer总结

    NameServer是一个注册中心 提供服务注册和服务发现的功能 NameServer可以集群部署 集群中每个节点都是对等的关系 没有像ZooKeeper那样在集群中选举出一个Master节点 节点之间互不通信 服务注册 Broker启动的
  • RocketMQ消费者端消息列队六种负载均衡算法分析

    在RocketMQ启动的时候会启动负载均衡线程 过程如下 DefaultMQPullConsumerImpl start mQClientFactory start 上面点进去 gt MQClientInstance start rebal
  • rocketmq客户端配置

    1 客户端配置 相对于RocketMQ的Broker集群 生产者和消费者都是客户端 2 客户端寻址方式 RocketMQ可以令客户端找到Name Server 然后通过Name Server再找到Broker 如下所示有多种配置方式 优先级
  • 22道常见RocketMQ面试题以及答案

    面试宝典到手 搞定面试 不再是难题 系列文章传送地址 请点击本链接 1 RocketMQ是什么 2 RocketMQ有什么作用 3 RoctetMQ的架构 4 RoctetMQ的优缺点 8 消息过滤 如何实现 9 消息去重 如果由于网络等原
  • RocketMQ经典高频面试题大全(附答案)

    编程界的小学生 0 彩蛋 1 说说你们公司线上生产环境用的是什么消息中间件 2 多个mq如何选型 3 为什么要使用MQ 4 RocketMQ由哪些角色组成 每个角色作用和特点是什么 5 RocketMQ中的Topic和JMS的queue有什
  • RocketMQ占用内存过大的解决方法

    目录 一 问题描述 二 解决方法 1 runserver sh 修改 2 runbroker sh 修改 一 问题描述 RocketMQ 启动后 一下子把内存撑爆了 二 解决方法 修改启动参数 分别对 bin 目录下的 runserver
  • 关于rocketmq 中日志文件路径的配置

    前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 rocketmq 中的数据和日志文件默认都是存储在user home路径下面的 往往我们都需要修改这些路径到指定文件夹以便管理 服务端日志 网
  • springboot-rocketmq日志rocketmq_client.log问题

    问题描述 springboot配置rocketmq后 会写入日志到rocketmqlogs目录下的rocketmq client log文件中 且日志过于庞大 解决 1 启动类增加代码 System setProperty ClientLo
  • Kafka:动态更新 jaas 配置

    我已经使用 kafka 设置了 jaas 配置sasl jaas config财产 我想更新此配置并动态添加用户 根据这个文档 http kafka apache org 11 documentation html dynamicbroke

随机推荐

  • -bash: java: command not found (Linux)

    原因 xff1a 安装jdk后没有配置环境变量 1 编辑配置文件 xff0c 配置环境变更 vim etc profile 在最下面添加 export JAVA HOME 61 usr local jdk8 export PATH 61 P
  • idea使用本地代码远程调试线上运行代码---windows环境

    场景 xff1a 今天在书上看了一个代码远程调试的方法 xff0c 自己本地验证了一下感觉十分不错 xff01 xff01 windows环境 xff1a 启动测试jar包 xff1a platform multiappcenter bas
  • anaconda:安装cuda和对应版本的cudnn

    复现别人论文的时候经常遇到不同的cuda版本 xff0c 可以使用anaconda创建虚拟环境 xff0c 并在不同的虚拟环境中配置对应的cuda版本 1 安装anaconda及虚拟环境使用 Anaconda多个python版本 xff08
  • Linux Server 种脚本自动执行

    在我们用python编写完脚本后 xff0c 时常需要定时运行我们的脚本 在这里 xff0c 我为大家介绍两种常用定时执行python脚本文件的方式 xff1a 第一种 xff1a crontab job 在Linux系统中可以通过设置cr
  • Tomcat9配置HTTP/2

    1 概述 Tomcat从Tomcat8的一些较新版本就支持HTTP 2了 xff0c Tomcat9直接支持 xff0c 本文首先讲述了相关HTTP 2的特性 xff0c 接着利用一个简单的开源工具mkcert生成证书并利用该证书配置HTT
  • SVN提交代码报错,怎么破?

    目录 SVN提交代码报错1 SVN提交被锁定 xff08 locked xff09 2 SVN提交已存在版本控制信息 xff08 is already under version control xff09 SVN提交代码报错 1 SVN提
  • Hive隐藏分割字符\001替换为可见字符

    Hive默认的分隔符是 001 xff0c 属于不可见字符 xff0c 这个字符在vi里是 A 一个文本0000 0 xff0c 直接cat内容如下 xff1a 320643204N2559613979 320828796N446323 3
  • 计算机毕业设计 HTML+CSS+JavaScript食品餐饮行业网站(10页)

    x1f380 精彩专栏推荐 x1f447 x1f3fb x1f447 x1f3fb x1f447 x1f3fb 作者简介 一个热爱把逻辑思维转变为代码的技术博主 x1f482 作者主页 主页 x1f680 获取更多优质源码 x1f393 w
  • 基于Redis实现的布隆过滤器

    一 RedisTemplate 1 首先将guava实现的本地的布隆过滤器的算法代码拿过来 span class token comment 算法过程 xff1a 1 首先需要k个hash函数 xff0c 每个函数可以把key散列成为1个整
  • Canal和Kafka整合方案——解决Canal写入Kafka并发消费问题

    文章目录 一 问题描述二 引入Kafka1 Canal整合Kafka及项目初步搭建2 整合Kafka后引出新问题 三 最终方案1 修改Canal配置文件2 修改项目代码3 整体架构4 结果验证 四 总结思考五 参考 一 问题描述 在使用Ca
  • 解决项目版本冲突——maven-shade插件使用

    文章目录 背景maven shade plugin介绍解决问题1 环境准备2 解决方案3 引入依赖 一些需要注意的坑maven shade plugins的其他使用 背景 当我们在maven项目中引入第三方组件时 xff0c 三方组件中的依
  • 通关剑指 Offer——剑指 Offer II 055. 二叉搜索树迭代器

    1 题目描述 剑指 Offer II 055 二叉搜索树迭代器 实现一个二叉搜索树迭代器类BSTIterator xff0c 表示一个按中序遍历二叉搜索树 xff08 BST xff09 的迭代器 xff1a BSTIterator Tre
  • 通关剑指 Offer——剑指 Offer II 056. 二叉搜索树中两个节点之和

    1 题目描述 剑指 Offer II 056 二叉搜索树中两个节点之和 给定一个二叉搜索树的 根节点 root 和一个整数 k 请判断该二叉搜索树中是否存在两个节点它们的值之和等于 k 假设二叉搜索树中节点的值均唯一 示例 1 xff1a
  • LeetCode 每日一题——1759. 统计同构子字符串的数目

    1 题目描述 1759 统计同构子字符串的数目 难度中等43 给你一个字符串 s xff0c 返回 s 中 同构子字符串 的数目 由于答案可能很大 xff0c 只需返回对 109 43 7 取余 后的结果 同构字符串 的定义为 xff1a
  • LeetCode 每日一题——1750. 删除字符串两端相同字符后的最短长度

    1 题目描述 1750 删除字符串两端相同字符后的最短长度 给你一个只包含字符 a xff0c b 和 c 的字符串 s xff0c 你可以执行下面这个操作 xff08 5 个步骤 xff09 任意次 xff1a 选择字符串 s 一个 非空
  • LeetCode 每日一题——2032. 至少在两个数组中出现的值

    1 题目描述 2032 至少在两个数组中出现的值 给你三个整数数组 nums1 nums2 和 nums3 xff0c 请你构造并返回一个 元素各不相同的 数组 xff0c 且由 至少 在 两个 数组中出现的所有值组成 数组中的元素可以按
  • 解决win10使用电池时自动调节亮度问题

    问题描述 Win10笔记本在使用电池时会出现根据界面窗口颜色自动调节亮度的问题 xff0c 特别是在使用Idea或Pycharm之类的在切换窗口时的亮度调节会让人感觉崩溃 为了解决这个问题 xff0c 上网找了很多解决方案 xff0c 大多
  • 使用Python一步步实现PCA算法

    使用Python一步步实现PCA算法 标签 xff1a PCA Python 本文原地址为 xff1a http sebastianraschka com Articles 2014 pca step by step html Implem
  • RocketMQ——生产者和消费者

    RocketMQ 生产者和消费者 文章目录 RocketMQ 生产者和消费者RocketMQ简介RocketMQ生产者RocketMQ消费者DefaultMQPushConsumerDefaultMQPullConsumer RocketM
  • RocketMQ——NameServer和Broker

    RocketMQ NameServer和Broker 文章目录 RocketMQ NameServer和BrokerNameServerNameServer功能为什么不用zookeeper xff1f BrokerBroker消息存储Bro