分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践

2023-11-17

2016-12-23 22:08  7789人阅读  评论(8)  收藏  举报
  分类:

版权声明:本文为博主原创文章,未经博主允许不得转载。

目录(?)[+]

说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢? 还是先发送消息,后update DB?

假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 
假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,这里下个结论: 只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

那这个问题怎么解决呢??

错误的方案0

有人可能想到了,我可以把“发送消息”这个网络调用和update DB放在同1个事务里面,如果发送消息失败,update DB自动回滚。这样不就保证2个操作的原子性了吗?

这个方案看似正确,其实是错误的,原因有2:

(1)网络的2将军问题:发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢?还是消息已经收到了,只是返回response的时候失败了?

如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致A账号的钱没有扣,B账号的钱却加了。

(2)把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务。严重的,会block整个DB。这个风险很大。

基于以上分析,我们知道,这个方案其实是错误的!

方案1–业务方自己实现

假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?

解决方案如下: 
(1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。

(2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。

(3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?

消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。

通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。

但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。

方案2 – RocketMQ 事务消息

为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

具体来说,上面的2个步骤,被分解成3个步骤: 
(1) 发送Prepared消息 
(2) update DB 
(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

具体代码实现如下:

也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。

// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
   
   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}
   
   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。

人工介入

可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?

消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?

答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?

对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践 的相关文章

  • 分布式消息队列RocketMQ 快速入门

    分布式消息队列RocketMQ 一 RocketMQ概述 概述 1 MQ简介 MQ Message Queue 是一种提供消息队列服务的中间件 是一套提供了消息生产 存储 消费全过程API的软件系统 2 MQ用途 限流削峰 MQ可以将系统的
  • 分布式开放消息系统(RocketMQ)的原理与实践

    分布式开放消息系统 RocketMQ 的原理与实践 作者 CHEN川 关注 2016 02 25 15 43 字数 6784 阅读 135462 评论 49 喜欢 351 赞赏 7 一年前为了一次内部分享而写的这篇文章 没想到会有这么多人阅
  • 【RocketMQ】设计理念与核心概念扫盲

    RocketMQ 设计理念与核心概念扫盲 文章目录 RocketMQ 设计理念与核心概念扫盲 一 RocketMQ的设计理念和目标 1 1 设计理念 1 2 设计目标 二 RocketMQ的核心概念扫盲篇 2 1 部署架构 2 1 1 Na
  • RocketMq客户端日志参数设置

    使用的RocketMq版本为4 7 1 RocketMq的客户端日志打印 Logger的创建代码在org apache rocketmq client log ClientLogger中 部分代码如下 public static final
  • MQClientException: CODE: 208  DESC: query message by key finished, but no message.

    2019 05 15 10 19 31 401 INFO closeChannel close the connection to remote address 127 0 0 1 10911 result true 2019 05 15
  • 【Linux系统安装RocketMQ并整合到SpringBoot项目】

    Linux系统安装RocketMQ并整合到SpringBoot项目 一 基本概念 1 1 NameServer 1 2 Broker 1 3 Message 1 3 Topic 1 4 Tag 1 5 Queue 1 6 MessageId
  • Docker实战:docker compose 搭建Rocketmq

    1 配置文件准备 1 1 新建目录 home docker data rocketmq conf mkdir home docker data rocketmq conf 1 2 在上面目录下新建文件broker conf文件 内容如下 b
  • RocketMQ部署之动态设置JVM启动参数

    这里是weihubeats 觉得文章不错可以关注公众号小奏技术 文章首发 拒绝营销号 拒绝标题党 背景 线上的RocketMQ集群有运行一段时间了 比如测试环境和线上环境的RocketMQ集群部署的机器内存大小肯定不一样 所以可能要写多个部
  • RocketMQ-如何保证顺序消息

    1 简介 实际开发中会有以下场景 需要保证一组消息的生产顺序与消费顺序相同 例如 监听数据库表单条数据的的多次修改 需要保证监听者最终得到的消息顺序和数据库表对单条数据的修改顺序一样 网购平台创建订单的过程一般都是异步实现 订单创建和支付流
  • RocketMQ Pull和Push

    在rocketmq里 consumer被分为2类 MQPullConsumer和MQPushConsumer 其实本质都是拉模式 pull 即consumer轮询从broker拉取消息 区别是 push方式里 consumer把轮询过程封装
  • 7 SpringBoot整合RocketMQ发送单向消息

    发送单向消息是指producer向 broker 发送消息 执行 API 时直接返回 不等待broker 服务器的结果 这种方式主要用在不特别关心发送结果的场景 举例 日志发送 RocketMQTemplate给我们提供了sendOneWa
  • 【RocketMQ】NameServer总结

    NameServer是一个注册中心 提供服务注册和服务发现的功能 NameServer可以集群部署 集群中每个节点都是对等的关系 没有像ZooKeeper那样在集群中选举出一个Master节点 节点之间互不通信 服务注册 Broker启动的
  • 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1 死信队列 1 1 死信特性 1 2 查看死信消息 2 重试次数参数 2 1 Producer端重试 2 2 Consumer端重试 3 1 异常重试 3 2 超时重试 参考 1 死信队列 上一篇 RocketMQ 消息重试中我
  • 1 RocketMQ简介

    简介 RocketMQ是由阿里捐赠给Apache的一款低延迟 高并发 高可用 高可靠的分布式消息中间件 经历了淘宝双十一的洗礼 RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力 同时也具备互联网应用所需的海量消息堆积 高吞吐
  • 【Python】记录一次 Linux + Python + RocketMQ 辛酸历程

    文章目录 安装Python 准备环境 编译安装 遇到问题 安装openssl 重新编译 安装依赖库 准备代码 验证 这是记录一次辛酸的Linux Python RocketMQ使用历程 需求背景是需要验证线上一个RocketMQ服务和里面的
  • 关于rocketmq 中日志文件路径的配置

    前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 rocketmq 中的数据和日志文件默认都是存储在user home路径下面的 往往我们都需要修改这些路径到指定文件夹以便管理 服务端日志 网
  • RocketMQ的消息优先级

    有些场景 需要应用程序处理几种类型的消息 不同消息的优先级不同 RocketMQ是个先入先出的队列 不支持消息级别或者Topic级别的优先级 业务中简单的优先级需求 可以通过间接的方式解决 下面列举三种优先级相关需求的具体处理方法 第一种
  • RocketMQ源码(26)—DefaultMQPushConsumer事务消息源码【一万字】

    事务消息是RocketMQ的一大特性 其被用来实现分布式事务 关于RocketMQ的事务消息的相关原理的介绍见这篇博客 RocketMQ的分布式事务机制 事务消息 关于事务消息的基本案例看这里 消息事务样例 本文主要介绍RocketMQ的事
  • 为什么Thread.sleep(0)可以阻止rocketmq中的gc?

    最近我阅读了RocketMQ的源代码 但我无法理解这段代码 为什么这段代码可以阻止gc呢 https github com apache rocketmq blob master store src main java org apache
  • 阿里架构专家力荐:架构修炼宝典,从基础到精通,让您轻松驾驭技术世界

    前言 作为程序员 确定发展方向和路线至关重要 而架构师则是许多人的追求之一 成为架构师并非易事 需要深厚的技术功底 当然 大厂架构师更具吸引力 但进入大厂并担任这一职位需要学习众多技术 或许你现在对此感到迷茫 但市面上已有多条现成的架构技术

随机推荐

  • ASP.NET WebApi + Autofac 实现依赖注入

    一 项目情况 框架 NET Framework 4 5 Autofac 3 5 0 Autofac WebApi2 4 3 0 二 定义接口与对应实现 接口1 public interface IBaseUserService List
  • 用Jupyter完成numpy、pandas、matplotlib三个库的例题

    文章目录 实验环境 一 numpy例题 二 pandas例题 三 matplotlib例题 四 总结 实验环境 jupyter notebook 一 numpy例题 生成一个一维数组 起始值为5 终点值为15 样本数为10个 import
  • 【转载】使用jsoup替换HTML标记

    原始代码 String html font fsdfs font dfsdf font dasdasd font Document doc Jsoup parse html Elements elements doc select font
  • 【frida逆向开发】frida-rpc远程调用某安app方法获取token

    目录 一 使用fiddler对app进行抓包 二 反编译app定位关键代码 三 frida rpc调用相关方法 一 使用fiddler对app进行抓包 通过抓包可以看到请求参数中X App Token e8f1c71569a7166b6aa
  • AD隐藏元件标号

    1 右键点击元件 不是标号 2 查找相似对象 3 点击应用 4 再点确定 5 把Show Name右边的勾取消
  • Go 语言进阶(一) -- Go hertz http框架、kitex RPC微服务框架、gorm 数据库框架三件套用法详解

    Go 框架三件套 1 概论 Gorm Gorm 是一个已经迭代了10年 的功能强大的 ORM框架 在字节内部被广泛使用并且拥有非常丰富的开源扩展 Kitex Kitex 是字节内部 Golang 微服务 RPC 框架 具有高性能 强可扩展的
  • 必看的知识

    学习路线 必看的知识 Spring实战 Spring 4 x企业应用开发实战 深入分析Java Web技术内幕 修订版 Effective Java Thinking in Java Java核心技术 Core Java Thinking
  • 赫拉(hera)分布式任务调度系统

    相关介绍 赫拉 hera 分布式任务调度系统之架构 基本功能 一 赫拉 hera 分布式任务调度系统之项目启动 二 赫拉 hera 分布式任务调度系统之开发中心 三 赫拉 hera 分布式任务调度系统之版本 四 赫拉 hera 分布式任务调
  • 修正了一个通信bug

    该BUG导致用户在打开webchat使用界面时不会读取联系人UPT 有时候刷新界面后会解决这个问题 经过发现是判断webchat是否存在在线用户以便于打印在线和离线联系人的分支结构出错 现在bug已经解除 同时解决了一个UPT串截断出错的b
  • java实现文件的断点续传的下载

    java的断点续传是基于之前java文件下载基础上的功能拓展 首先设置一个以线程ID为名的下载进度文件 每一次下载的进度会保存在这个文件中 下一次下载的时候 会根据进度文件里面的内容来判断下载的进度 package com ldw mult
  • Win10/Win11子系统(一)——wsl2+Ubuntu20.04安装记录

    windows子系统Ubuntu20 04安装过程记录 前言 一 安装前准备 二 开始安装 三 更换镜像源 四 安装图形化界面 五 警告处理 六 迁移子系统 前言 我和我最后的倔强 坚持不换windows的口号被现实打败了 装双系统会影响到
  • Hive SQL使用中遇到的问题与解决方案(持续更新

    近期 因统计分析 数据处理的工作需求 经常使用Hive SQL 因此记录遇到的一些问题 1 desc formatted 表名 确定表的信息 行 列 存储路径 在确定Hive 数据仓库中表的存储路径时 很有帮助 2 SQL GROUP BY
  • 【MedusaSTears】IntelliJ IDEA 自动生成方法注释模板设置(入参每行1个如图)

    快捷键 按键 按键 按键tab 效果图 设置方式 参考资料 https blog csdn net yuruixin china article details 80933835 我也是参考这个文章设置的 只不过我改了一些其它的内容 修改如
  • “疫情”防控时期大势所趋,智慧社区尽显“智慧”迎来新的发展热潮

    近期 国内新冠肺炎疫情在各地再次反扑 各种变异毒株 境外输入压力让疫情防控变的更加严峻 社区防控是第一道防线 进出小区人员登记 出示健康码 测量体温 居家隔离等是每个社区都要面临的防控压力 但是如果对社区内的居民不能精确管理 就会导致很多的
  • 1234. 替换子串得到平衡字符串

    有一个只含有 Q W E R 四种字符 且长度为 n 的字符串 假如在该字符串中 这四个字符都恰好出现 n 4 次 那么它就是一个 平衡字符串 给你一个这样的字符串 s 请通过 替换一个子串 的方式 使原字符串 s 变成一个 平衡字符串 你
  • Markdown预览 代码块自动化加代码行数-VSCode

    Markdown预览 代码块自动化加代码行数 VSCode 官方地址 https shd101wyy github io markdown preview enhanced zh cn markdown basics id 代码行数 第一步
  • JToolBarTest JToolBar 的一个测试类

    package com test JToolBarTest import javax swing JButton import javax swing JFrame import javax swing JToolBar public cl
  • python笔记:变量赋值与注意事项

    1 单个变量赋值 a 变量未赋值会报错 a 1 正确写法 2 多个变量赋值 方法1 a b c 1 方法2 a b c 1 1 1 print a b c 1 1 1 3 基本变量类型 五大类 字符串 string 数字 Numeric 列
  • 第1章 NumPy基础

    为何第1章介绍NumPy基础 在机器学习和深度学习中 图像 声音 文本等首先要数字化 如何实现数字化 数字化后如何处理 这些都涉及NumPy NumPy是数据科学的通用语言 它是科学计算 矩阵运算 深度学习的基石 PyTorch中的重要概念
  • 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践

    分布式消息队列RocketMQ 事务消息 解决分布式事务的最佳实践 标签 事务消息exactlyRocketMQKafka分布式消息队列 2016 12 23 22 08 7789人阅读 评论 8 收藏 举报 分类 分布式消息队列Rocke