Rocketmq原理&最佳实践

2023-11-05

一、 MQ背景&选型

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
  • 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
  • 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
• 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
• 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
• 支持18个级别的延迟消息(rabbitmq和kafka不支持)
• 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
• 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
• 支持重复消费(rabbitmq不支持,kafka支持)

Rocketmq、kafka、Rabbitmq的详细对比,请参照下表格:

 

image.png

二、RocketMQ集群概述

1. RocketMQ集群部署结构

 

image.png

1) Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

2) Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

3) Producer

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4) Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。

消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。

如果有需要,可以查看Rocketmq更多源码解析

三、 Rocketmq如何支持分布式事务消息

场景

A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),B和MQ保证事务一致(通过重试),从而达到最终事务一致性。

原理:大事务 = 小事务 + 异步

1. MQ与DB一致性原理(两方事务)

流程图

image.png

上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。

MQ消息、DB操作一致性方案:

1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。

2)执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。

3)如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。

4)MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。

说明:

上面以DB为例,其实此处可以是任何业务或者数据源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的状态,在MQ服务器内部是一个数字。

TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者rocketmq集群挂了的情况下。当rocketmq集群挂了,如果采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。所以如果要核心业务用Rocketmq解决分布式事务问题,建议选择同步刷盘模式。

2. 多系统之间数据一致性(多方事务)

image.png

当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过Rocketmq的事务性消息解决)已经无法支持。这个时候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。

以上图交易系统为例:

1)交易系统创建订单(往DB插入一条记录),同时发送订单创建消息。通过RocketMq事务性消息保证一致性

2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。

3)交易系统接受自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMq的重试功能,设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。 PS:多个RPC可以创建一个回滚任务,通过一个消费组接受一次消息就可以;也可以通过创建多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。 回滚任务失败,通过MQ的重发来重试。

以上是交易系统和其他系统之间保持最终一致性的解决方案。

3.案例分析

1) 单机环境下的事务示意图

如下为A给B转账的例子。

步骤 动作
1 锁定A的账户
2 锁定B的账户
3 检查A账户是否有1元
4 A的账户扣减1元
5 给B的账户加1元
6 解锁B的账户
7 解锁A的账户

以上过程在代码层面甚至可以简化到在一个事物中执行两条sql语句。

2) 分布式环境下事务

和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事物来实现。此时可以通过一下方式实现,将转账操作分成两个操作。

a) A账户

步骤 动作
1 锁定A的账户
2 检查A账户是否有1元
3 A的账户扣减1元
4 解锁A的账户

b) MQ消息
A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。

c) B账户

步骤 动作
1 锁定B的账户
2 给B的账户加1元
3 解锁B的账户

四、 顺序消息

1. 顺序消息缺陷

发送顺序消息无法利用集群Fail Over特性消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。

2. 原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。

注意:把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

3. 扩展

可以通过实现发送消息的对列选择器方法,实现部分顺序消息。

举例:比如一个数据库通过MQ来同步,只需要保证每个表的数据是同步的就可以。解析binlog,将表名作为对列选择器的参数,这样就可以保证每个表的数据到同一个对列里面,从而保证表数据的顺序消费

五、 最佳实践

1. Producer

1) Topic

一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags 在broker做消息过滤。

2) key

每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key 尽可能唯一,这样可以避免潜在的哈希冲突。

//订单Id

String orderId= "20034568923546";

message.setKeys(orderId);

3) 日志

消息发送成功或者失败,要打印消息日志,务必要打印 send result 和key 字段。

4) send

send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。

SEND_OK:消息发送成功

FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

2. Consumer

1) 幂等

RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息,此时务必做好幂等。

2) 日志

消费时记录日志,以便后续定位问题。

3) 批量消费

尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。

六、 参考资料

1. 文档

RocketMQ_design.pdf
RocketMQ_experience.pdf

2. 博客

分布式开放消息系统(RocketMQ)的原理与实践

http://www.jianshu.com/p/453c6e7ff81c

RocketMQ事务消费和顺序消费详解

http://www.cnblogs.com/520playboy/p/6750023.html

ZeroCopy

http://www.linuxjournal.com/article/6345

IO方式的性能数据

http://stblog.baidu-tech.com/?p=851

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Rocketmq原理&最佳实践 的相关文章

  • C语言 消息队列

    消息队列 xff08 也叫做报文队列 xff09 能够克服早期unix通信机制的一些缺点 作为早期unix通信机制之一的信号能够传送的信息量有限 xff0c 后来虽然POSIX 1003 1b在信号的实时性方面作了拓广 xff0c 使得信号
  • rocketMq中文文档

    title 用户指引 date 2017 12 29 categories 文档翻译 为什么是RocketMQ 动机 在早期阶段 我们在ActiveMQ 5 x 早于5 3 的基础上构建我们的分布式消息中间件 我们的跨国业务使用它来实现异步
  • Springboot整合RabbitMQ

    一 Springboot整合RabbitMQ的代码实现 1 引入Springboot整合RabbitMQ的依赖
  • RocketMQ和kafka

    RocketMQ 分为集群消息 一组中只有一个消费者竞争到消息 和广播消息 组内消费者都会消费消息 相关概念有 topic 一个消息的主题 一级分类 tag 消息的二级分类 queque 消息队列 brocker里直接存储消息就是在queq
  • Linux进程间通信-消息队列

    首先上篇文章我们说到了Linux下进行进程间通信的一种方法或机制匿名管道和命名管道 那么这里要说的是另外一种与之不同的通信方法 即消息队列 两者之间有相同也有不同的地方 具体的下面就一一介绍 一 什么是消息队列 首先它也是一种进行进程间通信
  • RocketMQ第二篇 单机版安装操作步骤

    MQ下载地址 下载RocketMQ 4 7 1版本 RocketMQ运行版本下载地址 https archive apache org dist rocketmq 4 7 1 rocketmq all 4 7 1 bin release z
  • RabbitMQ高级特性-Confirm确认消息

    Confirm确认消息 消息的确认 是指生产者投递消息后 如果Broker收到消息 则会给我们产生一个应答 生产者进行接收应答 用来确定这条消息是否正常发送到Broker 这种方式也是消息的可靠性投递的核心保障 如何实现Confirm确认消
  • 4 SpringBoot整合RocketMQ实现消息发送和接收

    我们使用主流的SpringBoot框架整合RocketMQ来讲解 使用方便快捷 最终项目结构如下 具体步骤如下 第一步 我们新建一个父项目rocketmq test pom类型 主要是依赖管理 包括版本的管理 以及管理module子项目 p
  • RocketMQ系列之入门

    前言 之前我们把RMQ的多Master集群搭建起来了 我们今天就来看看如何向这个集群生产消息以及消费消息 集群搭建回顾 回顾上节的内容 我总结下以下几步 第一 最新版RMQ4 2 0要求最低JDK8版本 第二 修改虚拟机的host 配置na
  • 一文详解RabbitMQ,RocketMQ和Kafka的异同

  • (二)Rocketmq目录结构及设计目标

    文章目录 一 目录结构 二 设计理念与目标 2 1设计理念 2 2设计目标 一 目录结构 1 broker broker模块 2 client 消息客户端 包含消息生产者 消费者相关类 3 common 公共包 4 dev 开发者信息 非源
  • RabbitMQ消息队列实战(1)—— RabbitMQ的体系

    RabbitMQ是一个开源的消息代理和队列服务器 用来在不同的应用之间共享数据 1983年 被认为是RabbitMQ的雏形的Teknekron创建 首次提出了消息总线的概念 中间经历过数个阶段的发展 一直到2004年 AMQP Advanc
  • java 使用RabbitMQ示例

    RabbitMQ简介 RabbitMQ是一个受欢迎的消息代理 通常用于应用程序之间或者程序的不同组件之间通过消息来进行集成 具有高可用高并发的优点 适合集群服务器 采用 Erlang实现 对主要的编程语言都有客户端支持 RabbitMQ环境
  • 使用RabbitMQ实现延时队列

    之前公司是一个类电商公司 会有用户下单后未支付取消订单的场景 解决方案是使用RabbitMQ的死信队列来实现一个延时队列 下单时 将订单丢进消息队列 设置过期时间 订单失效时间 然后到时候检查订单状态 如果未支付则取消订单 1 什么是死信
  • 7 SpringBoot整合RocketMQ发送单向消息

    发送单向消息是指producer向 broker 发送消息 执行 API 时直接返回 不等待broker 服务器的结果 这种方式主要用在不特别关心发送结果的场景 举例 日志发送 RocketMQTemplate给我们提供了sendOneWa
  • Rabbitmq入门到进阶看这篇就够了!

    安装前提 安装 erlang windows用户名非中文 可以关注我的公众号 知识追寻者 回复 rabbitmq 获取已经下载好的安装包和配套源码地址 本套教程对应知识追寻者网址 windows安装rabbitmq zszxz com Ra
  • RocketMQ第五篇 RocketMQ API基本使用

    目录 生产者Product 消费者Consumer 前面已经学习了Rocket的基本知识 以及搭建MQ单机版和集群环境 下面开始进行实际开发 根据前面下载的RocketMQ源码 开展讲解RocketMQ 的基本使用 生产者Product 在
  • docker安装rocketmq4.6.1(精简版)

    一 创建文件 mkdir p usr local rocketmq server logs usr local rocketmq server store usr local rocketmq broker logs usr local r
  • RT-Thread记录(七、IPC机制之邮箱、消息队列)

    讲完了线程同步的机制 我们要开始线程通讯的学习 线程通讯中的邮箱消息队列也属于 RT Thread 的IPC机制 目录 前言 一 邮箱 1 1 邮箱控制块 1 2 邮箱操作 1 2 1 创建和删除 1 2 2 初始化和脱离 1 2 3 发送
  • 高可用:如何实现消息队列的 HA?

    管理学上有一个木桶理论 一只水桶能装多少水取决于它最短的那块木板 这个理论推广到分布式系统的可用性上 就是系统整体的可用性取决于系统中最容易出现故障 或者性能最低的组件 系统中的各个组件都要进行高可用设计 防止单点故障 消息队列也不例外 本

随机推荐

  • echart 实现可以点击下钻的地图

    codescanbox 封装成了一个类 通过 getLoadMap 来获取对应的实例对象 一个是单纯的地图 一个是可以打点的地图 getLoadMap 需要 3 个参数 echarts 实例 registerMap 注册地图的api typ
  • ionic3之js(jQuary),css,图片的引入

    一 js文件 以jQuary为例 相信有很多朋友使用不习惯angularjs 所以想使用已经很熟悉的JQuary 在这里我就给出怎么引入jQuary文件 并使用 1 把要引入的jQuary文件放到app下的assets目录下 2 在src下
  • go 进阶 请求代理相关: 三. ReverseProxy 负载均衡

    目录 一 ReverseProxy 负载均衡 简单随机负载均衡示例 简单轮询负载均衡示例 加权负载均衡示例 一致性Hash 二 反向代理添加负载均衡功能 一 ReverseProxy 负载均衡 ReverseProxy 支持负载均衡功能 提
  • IO之字节字符转换流

    1 转换流概述 转换流 可以将一个字节流转换为字符流 也可以将一个字符流转换为字节流 OutputStreamWriter 可以将输出的字符流转换为字节流的输出形式 InputStreamReader 将输入的字节流转换为字符流输入形式 2
  • 多文件编辑作业(2023.1.9)

    第一题 main c include head h int main int argc const char argv char str 10 abcdefg MyStrRev str char a hello StrRevRec a st
  • qt 在ui界面添加控件后在cpp文件中无法调用?

    问题 qt 在ui界面添加控件后在cpp文件中无法调用 解决方法 在build选项中选择 重新build项目 再次在cpp中调用添加的控件发现可以调用了 还有一种情况导致添加控件后无法调用 就是没有导入ui xxx h文件 xxx是ui界面
  • Python图像处理-4.pil调整图片尺寸和旋转角度

    from PIL import Image import matplotlib pyplot as plt pil im1 Image open pic1 png plt figure girlfriend1 plt imshow pil
  • 魔搭开源FaceChain个人写真项目,大幅提升写真多样性,登顶github趋势榜首!

    一 上周数据概览 一周时间获取超过3K star 连续在github trending榜单蝉联top 开发者们纷纷标记star GitHub modelscope facechain FaceChain is a deep learning
  • zlib库VS2017编译步骤

    点击这里下载zlib1 2 11源码 http zlib net zlib1211 zip 下载源码库 从上面给出的源码路径下载zlib源码库 如果不想自己编译 可以使用上面给出的二进制包直接使用 无视本文 编译步骤 编译方法一 解压源码文
  • MyBatis-plus 动态条件构造器总结

    MyBatis plus 动态条件构造器类结构图 MyBatis Plus条件构造器QueryWrapper对应常用SQL语法说明 函数 说明 SQL语法 eq 等于 ne 不等于 lt gt gt 大于 gt lt 小于 lt ge 大于
  • STM32单片机通过ESP8266WiFi模块与Android APP实现数据传输(一)---下位机硬件配置

    事务的难度远远低于对事物的恐惧 STM32F407单片机通过ESP8266 WiFi模块与Android 手机APP连接实现数据的相互传输 在单片机上通过LCD显示屏实时显示连接的状态以及互相传输的数据 先看效果图 STM32单片机 And
  • simulink教程(自动控制原理)

    1 启动simulink 命令行输入simulink或者 会弹出 2 点击blank model 出现新窗口 新建或者打开模型文件 There are two major classes of items in Simulink block
  • GLES3.0中文API-glDrawRangeElements

    名称 glDrawRangeElements 从数组数据渲染基元 C规范 void glDrawRangeElements GLenum mode GLuint start GLuint end GLsizei count GLenum t
  • Open mv识别三角形的办法

    文章目录 前言 带着问题来看 一 函数 二 使用方法 1 find line segments 2 img find template 三 摄像情况及终端结果 1 find line segments 2 img find template
  • 初始C语言——利用Ascll码进行字母大小写转换

    打开Ascll码表 你会发现大写字母和小写字母之间存在这样的关系 图片来自 https img blog csdnimg cn 54404234b42348d6a33bc1c4d5ab24e5 png 小写字母的值始终比大写字母多32 de
  • Node.js

    Node js Node js基础 概念 简单的说 Node js 就是运行在服务端的 JavaScript Node js 是一个基于Chrome JavaScript 运行时建立的一个平台 Node js是一个事件驱动I O服务端Jav
  • (五)决策树

    一 决策树 决策树是监督学习算法 下面为一些样本 本质上是一种特征去结果的相关度 比如你的信贷情况与能否还贷的相关度肯定高 而你有没有结婚的相关度肯定低 二 信息增益 三 ID3算法
  • php 未支付取消订单,【php】用户提交订单,30分钟后没付款取消订单功能分析

    我先在要做这样的功能 用户在创建订单后 订单表中记入的是未付款状态 如果用户在30分钟后 还未付款 然后就把该订单给取消 关于用户创建订单 30分钟后还没付款 取消该订单的逻辑是怎么实现的 我自己的想了两个方案 1 客户端记入这个订单 如果
  • MindNode 5 for Mac(思维导图软件)中文版

    绘制流程图 思维导图 规划图 信息图等自然少不了这款MindNode 5 for Mac 作为优质的思维导图软件 mindnode5 mac破解版的功能很全面 添加文字 链接 图片 扩展注释等非常便捷 而且mindnode 5 破解版会智能
  • Rocketmq原理&最佳实践

    一 MQ背景 选型 消息队列作为高并发系统的核心组件之一 能够帮助业务系统解构提升开发效率和系统稳定性 主要具有以下优势 削峰填谷 主要解决瞬时写压力大于应用服务能力导致消息丢失 系统奔溃等问题 系统解耦 解决不同重要程度 不同能力级别系统