基于RocketMQ分布式事务 - 完整示例

2023-10-31

前言

之前我们说到,分布式事务是一个复杂的技术问题。没有通用的解决方案,也缺乏简单高效的手段。

不过,如果我们的系统不追求强一致性,那么最常用的还是最终一致性方案。今天,我们就基于 RocketMQ来实现消息最终一致性方案的分布式事务。

本文代码不只是简单的demo,考虑到一些异常情况、幂等性消费和死信队列等情况,尽量向可靠业务场景靠拢。

另外,在最后还有《RocketMQ技术内幕》一书中,关于分布式事务示例代码的错误流程分析,所以篇幅较长,希望大家耐心观看。

一、事务消息

在这里,笔者不想使用大量的文字赘述 RocketMQ事务消息的原理,我们只需要搞明白两个概念。

  • Half Message,半消息

暂时不能被 Consumer消费的消息。Producer已经把消息发送到 Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC的主题下。

当 Producer端对它二次确认后,也就是 Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。

  • 事务状态回查

我们想,可能会因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这时候 Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态。

当然,什么时候去扫描,包含扫描几次,我们都可以配置,在后文我们再细说。

简而言之,RocketMQ事务消息的实现原理就是基于两阶段提交和事务状态回查,来决定消息最终是提交还是回滚的。

在本文,我们的代码就以 订单服务、积分服务 为例。结合上文来看,整体流程如下:

二、订单服务

在订单服务中,我们接收前端的请求创建订单,保存相关数据到本地数据库。

1、事务日志表

在订单服务中,除了有一张订单表之外,还需要一个事务日志表。 它的定义如下:

 
  1. CREATE TABLE `transaction_log` (

  2. `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事务ID',

  3. `business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '业务标识',

  4. `foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '对应业务表中的主键',

  5. PRIMARY KEY (`id`)

  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

这张表专门作用于事务状态回查。当提交业务数据时,此表也插入一条数据,它们共处一个本地事务中。通过事务ID查询该表,如果返回记录,则证明本地事务已提交;如果未返回记录,则本地事务可能是未知状态或者是回滚状态。

2、TransactionMQProducer

我们知道,通过 RocketMQ发送消息,需先创建一个消息发送者。值得注意的是,如果发送事务消息,在这里我们的创建的实例必须是 TransactionMQProducer

 
  1. @Component

  2. public class TransactionProducer {

  3. private String producerGroup = "order_trans_group";

  4. private TransactionMQProducer producer;

  5. //用于执行本地事务和事务状态回查的监听器

  6. @Autowired

  7. OrderTransactionListener orderTransactionListener;

  8. //执行任务的线程池

  9. ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,

  10. TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));

  11. @PostConstruct

  12. public void init(){

  13. producer = new TransactionMQProducer(producerGroup);

  14. producer.setNamesrvAddr("127.0.0.1:9876");

  15. producer.setSendMsgTimeout(Integer.MAX_VALUE);

  16. producer.setExecutorService(executor);

  17. producer.setTransactionListener(orderTransactionListener);

  18. this.start();

  19. }

  20. private void start(){

  21. try {

  22. this.producer.start();

  23. } catch (MQClientException e) {

  24. e.printStackTrace();

  25. }

  26. }

  27. //事务消息发送

  28. public TransactionSendResult send(String data, String topic) throws MQClientException {

  29. Message message = new Message(topic,data.getBytes());

  30. return this.producer.sendMessageInTransaction(message, null);

  31. }

  32. }

上面的代码中,主要就是创建事务消息的发送者。在这里,我们重点关注 OrderTransactionListener,它负责执行本地事务和事务状态回查。

3、OrderTransactionListener

 
  1. @Component

  2. public class OrderTransactionListener implements TransactionListener {

  3. @Autowired

  4. OrderService orderService;

  5. @Autowired

  6. TransactionLogService transactionLogService;

  7. Logger logger = LoggerFactory.getLogger(this.getClass());

  8. @Override

  9. public LocalTransactionState executeLocalTransaction(Message message, Object o) {

  10. logger.info("开始执行本地事务....");

  11. LocalTransactionState state;

  12. try{

  13. String body = new String(message.getBody());

  14. OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);

  15. orderService.createOrder(order,message.getTransactionId());

  16. state = LocalTransactionState.COMMIT_MESSAGE;

  17. logger.info("本地事务已提交。{}",message.getTransactionId());

  18. }catch (Exception e){

  19. logger.info("执行本地事务失败。{}",e);

  20. state = LocalTransactionState.ROLLBACK_MESSAGE;

  21. }

  22. return state;

  23. }

  24. @Override

  25. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

  26. logger.info("开始回查本地事务状态。{}",messageExt.getTransactionId());

  27. LocalTransactionState state;

  28. String transactionId = messageExt.getTransactionId();

  29. if (transactionLogService.get(transactionId)>0){

  30. state = LocalTransactionState.COMMIT_MESSAGE;

  31. }else {

  32. state = LocalTransactionState.UNKNOW;

  33. }

  34. logger.info("结束本地事务状态查询:{}",state);

  35. return state;

  36. }

  37. }

在通过 producer.sendMessageInTransaction发送事务消息后,如果消息发送成功,就会调用到这里的executeLocalTransaction方法,来执行本地事务。在这里,它会完成订单数据和事务日志的插入。

该方法返回值 LocalTransactionState 代表本地事务状态,它是一个枚举类。

 
  1. public enum LocalTransactionState {

  2. //提交事务消息,消费者可以看到此消息

  3. COMMIT_MESSAGE,

  4. //回滚事务消息,消费者不会看到此消息

  5. ROLLBACK_MESSAGE,

  6. //事务未知状态,需要调用事务状态回查,确定此消息是提交还是回滚

  7. UNKNOW;

  8. }

那么, checkLocalTransaction 方法就是用于事务状态查询。在这里,我们通过事务ID查询transaction_log这张表,如果可以查询到结果,就提交事务消息;如果没有查询到,就返回未知状态。

注意,这里还涉及到另外一个问题。如果是返回未知状态,RocketMQ Broker服务器会以1分钟的间隔时间不断回查,直至达到事务回查最大检测数,如果超过这个数字还未查询到事务状态,则回滚此消息。

当然,事务回查的频率和最大次数,我们都可以配置。在 Broker 端,可以通过这样来配置它:

 
  1. brokerConfig.setTransactionCheckInterval(10000); //回查频率10秒一次

  2. brokerConfig.setTransactionCheckMax(3); //最大检测次数为3

4、业务实现类

 
  1. @Service

  2. public class OrderServiceImpl implements OrderService {

  3. @Autowired

  4. OrderMapper orderMapper;

  5. @Autowired

  6. TransactionLogMapper transactionLogMapper;

  7. @Autowired

  8. TransactionProducer producer;

  9. Snowflake snowflake = new Snowflake(1,1);

  10. Logger logger = LoggerFactory.getLogger(this.getClass());

  11. //执行本地事务时调用,将订单数据和事务日志写入本地数据库

  12. @Transactional

  13. @Override

  14. public void createOrder(OrderDTO orderDTO,String transactionId){

  15. //1.创建订单

  16. Order order = new Order();

  17. BeanUtils.copyProperties(orderDTO,order);

  18. orderMapper.createOrder(order);

  19. //2.写入事务日志

  20. TransactionLog log = new TransactionLog();

  21. log.setId(transactionId);

  22. log.setBusiness("order");

  23. log.setForeignKey(String.valueOf(order.getId()));

  24. transactionLogMapper.insert(log);

  25. logger.info("订单创建完成。{}",orderDTO);

  26. }

  27. //前端调用,只用于向RocketMQ发送事务消息

  28. @Override

  29. public void createOrder(OrderDTO order) throws MQClientException {

  30. order.setId(snowflake.nextId());

  31. order.setOrderNo(snowflake.nextIdStr());

  32. producer.send(JSON.toJSONString(order),"order");

  33. }

  34. }

在订单业务服务类中,我们有两个方法。一个用于向RocketMQ发送事务消息,一个用于真正的业务数据落库。

至于为什么这样做,其实有一些原因的,我们后面再说。

5、调用

 
  1. @RestController

  2. public class OrderController {

  3. @Autowired

  4. OrderService orderService;

  5. Logger logger = LoggerFactory.getLogger(this.getClass());

  6. @PostMapping("/create_order")

  7. public void createOrder(@RequestBody OrderDTO order) throws MQClientException {

  8. logger.info("接收到订单数据:{}",order.getCommodityCode());

  9. orderService.createOrder(order);

  10. }

  11. }

6、总结

目前已经完成了订单服务的业务逻辑。我们总结流程如下:

考虑到异常情况,这里的要点如下:

  • 第一次调用createOrder,发送事务消息。如果发送失败,导致报错,则将异常返回,此时不会涉及到任何数据安全。
  • 如果事务消息发送成功,但在执行本地事务时发生异常,那么订单数据和事务日志都不会被保存,因为它们是一个本地事务中。
  • 如果执行完本地事务,但未能及时的返回本地事务状态或者返回了未知状态。那么,会由Broker定时回查事务状态,然后根据事务日志表,就可以判断订单是否已完成,并写入到数据库。

基于这些要素,我们可以说,已经保证了订单服务和事务消息的一致性。那么,接下来就是积分服务如何正确的消费订单数据并完成相应的业务操作。

三、积分服务

在积分服务中,主要就是消费订单数据,然后根据订单内容,给相应用户增加积分。

1、积分记录表

 
  1. CREATE TABLE `t_points` (

  2. `id` bigint(16) NOT NULL COMMENT '主键',

  3. `user_id` bigint(16) NOT NULL COMMENT '用户id',

  4. `order_no` bigint(16) NOT NULL COMMENT '订单编号',

  5. `points` int(4) NOT NULL COMMENT '积分',

  6. `remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '备注',

  7. PRIMARY KEY (`id`)

  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

在这里,我们重点关注order_no字段,它是实现幂等消费的一种选择。

2、消费者启动

 
  1. @Component

  2. public class Consumer {

  3. String consumerGroup = "consumer-group";

  4. DefaultMQPushConsumer consumer;

  5. @Autowired

  6. OrderListener orderListener;

  7. @PostConstruct

  8. public void init() throws MQClientException {

  9. consumer = new DefaultMQPushConsumer(consumerGroup);

  10. consumer.setNamesrvAddr("127.0.0.1:9876");

  11. consumer.subscribe("order","*");

  12. consumer.registerMessageListener(orderListener);

  13. consumer.start();

  14. }

  15. }

启动一个消费者比较简单,我们指定要消费的 topic 和监听器就好了。

3、消费者监听器

 
  1. @Component

  2. public class OrderListener implements MessageListenerConcurrently {

  3. @Autowired

  4. PointsService pointsService;

  5. Logger logger = LoggerFactory.getLogger(this.getClass());

  6. @Override

  7. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {

  8. logger.info("消费者线程监听到消息。");

  9. try{

  10. for (MessageExt message:list) {

  11. logger.info("开始处理订单数据,准备增加积分....");

  12. OrderDTO order = JSONObject.parseObject(message.getBody(), OrderDTO.class);

  13. pointsService.increasePoints(order);

  14. }

  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

  16. }catch (Exception e){

  17. logger.error("处理消费者数据发生异常。{}",e);

  18. return ConsumeConcurrentlyStatus.RECONSUME_LATER;

  19. }

  20. }

  21. }

监听到消息之后,调用业务服务类处理即可。处理完成则返回CONSUME_SUCCESS以提交,处理失败则返回RECONSUME_LATER来重试。

4、增加积分

在这里,主要就是对积分数据入库。但注意,入库之前需要先做判断,来达到幂等性消费。

 
  1. @Service

  2. public class PointsServiceImpl implements PointsService {

  3. @Autowired

  4. PointsMapper pointsMapper;

  5. Snowflake snowflake = new Snowflake(1,1);

  6. Logger logger = LoggerFactory.getLogger(this.getClass());

  7. @Override

  8. public void increasePoints(OrderDTO order) {

  9. //入库之前先查询,实现幂等

  10. if (pointsMapper.getByOrderNo(order.getOrderNo())>0){

  11. logger.info("积分添加完成,订单已处理。{}",order.getOrderNo());

  12. }else{

  13. Points points = new Points();

  14. points.setId(snowflake.nextId());

  15. points.setUserId(order.getUserId());

  16. points.setOrderNo(order.getOrderNo());

  17. Double amount = order.getAmount();

  18. points.setPoints(amount.intValue()*10);

  19. points.setRemarks("商品消费共【"+order.getAmount()+"】元,获得积分"+points.getPoints());

  20. pointsMapper.insert(points);

  21. logger.info("已为订单号码{}增加积分。",points.getOrderNo());

  22. }

  23. }

  24. }

5、幂等性消费

实现幂等性消费的方式有很多种,具体怎么做,根据自己的情况来看。

比如,在本例中,我们直接将订单号和积分记录绑定在同一个表中,在增加积分之前,就可以先查询此订单是否已处理过。

或者,我们也可以额外创建一张表,来记录订单的处理情况。

再者,也可以将这些信息直接放到redis缓存里,在入库之前先查询缓存。

不管以哪种方式来做,总的思路就是在执行业务前,必须先查询该消息是否被处理过。那么这里就涉及到一个数据主键问题,在这个例子中,我们以订单号为主键,也可以用事务ID作主键,如果是普通消息的话,我们也可以创建唯一的消息ID作为主键。

6、消费异常

我们知道,当消费者处理失败后会返回 RECONSUME_LATER ,让消息来重试,默认最多重试16次。

那,如果真的由于特殊原因,消息一直不能被正确处理,那怎么办 ?

我们考虑两种方式来解决这个问题。

第一,在代码中设置消息重试次数,如果达到指定次数,就发邮件或者短信通知业务方人工介入处理。

 
  1. @Component

  2. public class OrderListener implements MessageListenerConcurrently {

  3. Logger logger = LoggerFactory.getLogger(this.getClass());

  4. @Override

  5. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {

  6. logger.info("消费者线程监听到消息。");

  7. for (MessageExt message:list) {

  8. if (!processor(message)){

  9. return ConsumeConcurrentlyStatus.RECONSUME_LATER;

  10. }

  11. }

  12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

  13. }

  14. /**

  15. * 消息处理,第3次处理失败后,发送邮件通知人工介入

  16. * @param message

  17. * @return

  18. */

  19. private boolean processor(MessageExt message){

  20. String body = new String(message.getBody());

  21. try {

  22. logger.info("消息处理....{}",body);

  23. int k = 1/0;

  24. return true;

  25. }catch (Exception e){

  26. if(message.getReconsumeTimes()>=3){

  27. logger.error("消息重试已达最大次数,将通知业务人员排查问题。{}",message.getMsgId());

  28. sendMail(message);

  29. return true;

  30. }

  31. return false;

  32. }

  33. }

  34. }

第二,等待消息重试最大次数后,进入死信队列。

消息重试最大次数默认是16次,我们也可以在消费者端设置这个次数。

consumer.setMaxReconsumeTimes(3);//设置消息重试最大次数

死信队列的主题名称是 %DLQ% + 消费者组名称,比如在订单数据中,我们设置了消费者组名:

String consumerGroup = "order-consumer-group";

那么这个消费者,对应的死信队列主题名称就是%DLQ%order-consumer-group

如上图,我们还需要点击TOPIC配置,来修改里面的 perm 属性,改为 6 即可。

最后就可以通过程序代码监听这个主题,来通知人工介入处理或者直接在控制台查看处理了。通过幂等性消费和对死信消息的处理,基本上就能保证消息一定会被处理。

四、《RocketMQ技术内幕》中的代码示例

笔者手里有一本书《RocketMQ技术内幕》,在 9.4 章节有一段分布式事务的代码。

不过,笔者在看了之后,感觉它里面的流程是有问题的,会造成本地事务的不一致,下面我们就来分析一下。

在这里,我们主要是关注书中订单业务服务类和事务监听器的流程。

在书中,订单下单伪代码如下:

 
  1. public Map createOrder(){

  2. Map result = new HashMap();

  3. //执行下订单相关的业务流程,例如操作本地数据库落库相关代码

  4. //生成事务消息唯一业务标识,将该业务标识组装到待发送的消息体中,方便消息端进行幂等消费。

  5. //调用消息客户端API,发送事务prepare消息。

  6. //返回结果,提交事务

  7. return result;

  8. }

上述是第一步,发送事务消息,接下来需要实现TransactionListener,实现执行本地事务与本地事务回查。

 
  1. public class OrderTransactionListenerImpl implements TransactionListener {

  2. @Override

  3. public LocalTransactionState executeLocalTransaction(Message message, Object o) {

  4. //从消息体中获取业务唯一ID

  5. String bizUniNo = message.getUserProperty("bizUniNo");

  6. //将bizUniNo入库,表名:t_message_transaction,表结构 bizUniNo(主键),业务类型。

  7. return LocalTransactionState.UNKNOW;

  8. }

  9. @Override

  10. public LocalTransactionState checkLocalTransaction(MessageExt message) {

  11. //从消息体中获取业务唯一ID

  12. String bizUniNo = message.getUserProperty("bizUniNo");

  13. //如果本地事务表(t_message_transaction)存在记录,则认为提交;如果不存在返回未知。

  14. //如果多次回查还是未查到消息,则回滚。

  15. if (query(bizUniNo)>0){

  16. return LocalTransactionState.COMMIT_MESSAGE;

  17. }else{

  18. return LocalTransactionState.UNKNOW;

  19. }

  20. }

  21. //查询数据库是否存在记录

  22. public int query(String bizUniNo){

  23. //select count(1) from t_message_transaction where biz_uni_no = #{bizUniNo}

  24. return 1;

  25. }

  26. }

上面的代码是笔者在这本书里,抄录出来的,如果是按照这种做法, 实际上是有问题的,我们来分析一下。

1、下单异常

我们看上面的订单下单的伪代码,里面包含两个操作:订单入库和事务消息发送。

那么我们继续思考:

  • 如果订单入库的时候发生异常,这个没问题,因为事务消息也不会发送;
  • 如果订单入库执行完毕,但发送事务消息报错。这个也没问题,订单数据会回滚;
  • 如果订单入库执行完毕,发送事务消息也没有报错。但返回的不是SEND_OK状态,这个是有问题的。

因为只有发送事务消息成功,并且发送状态为SEND_OK,才会执行监听器中的本地事务,向t_message_transaction表写入事务日志。

那么就会造成一个现场:本地订单数据已经入库,但是由于没有返回SEND_OK状态,导致不会执行本地事务中的事务日志。那么这条事务消息早晚会被回滚,最后的问题就是用户下单成功,但没有增加积分。

2、本地事务执行异常

事实上,第一个问题也可以规避。那就是在发送完事务消息后,再判断下发送状态是不是SEND_OK,如果不是的话,就通过抛异常的方式来回滚订单数据。

但是,还有第二个问题:

如果订单数据和事务消息发送都没有问题,但是在执行本地事务时,写入事务日志时发生异常怎么办 ?

如果是这样,也会导致本地订单数据已经入库,但是事务日志没有写入,在事务状态回查的时候一直查询不到此记录,最后只能回滚事务消息。最后的现象同样是用户下单成功,但没有增加积分。

但是在书中,作者有这样一段话:

executeLocalTransaction,该方法主要设置本地事务状态,与业务代码在一个事务中。例如在OrderService#createOrder中,只要本地事务提交成功,该方法也会提交成功。故在这里,主要是向t_message_transaction添加一条记录,在事务回查时,如果存在记录,就认为该消息需要提交。

作者这段话的意思,我理解是说他们都处于一个本地事务中。如果createOrder方法执行成功,则executeLocalTransaction方法也会执行成功;如果任何一方出错,都会回滚事务。

但是,我们从源码中分析的话,如果本地事务执行报错,订单数据是不会回滚的。

3、源码分析

首先,我们要知道,executeLocalTransaction方法和createOrder方法确实在一个事务里。

这是因为executeLocalTransaction方法,是在发送事务消息之后,同步调用到的,所以它们在一个事务里。

我们来看源码中,事务消息发送的过程:

 
  1. public TransactionSendResult sendMessageInTransaction(Message msg,

  2. LocalTransactionExecuter localTransactionExecuter,

  3. Object arg)throws MQClientException {

  4. //发送事务消息返回结果

  5. SendResult sendResult = null;

  6. //如果发送消息失败,抛出异常

  7. try {

  8. sendResult = this.send(msg);

  9. } catch (Exception var11) {

  10. throw new MQClientException("send message Exception", var11);

  11. }

  12. //初始化本地事务状态:未知状态

  13. LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;

  14. Throwable localException = null;

  15. switch(sendResult.getSendStatus()) {

  16. //如果发送事务消息状态为send_ok

  17. case SEND_OK:

  18. try {

  19. //执行本地事务方法

  20. if (transactionListener != null) {

  21. this.log.debug("Used new transaction API");

  22. localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

  23. }

  24. } catch (Throwable var10) {

  25. this.log.info("executeLocalTransactionBranch exception", var10);

  26. this.log.info(msg.toString());

  27. localException = var10;

  28. }

  29. break;

  30. //如果发送事务状态不是send_ok,该事务消息会被回滚

  31. case FLUSH_DISK_TIMEOUT:

  32. case FLUSH_SLAVE_TIMEOUT:

  33. case SLAVE_NOT_AVAILABLE:

  34. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;

  35. }

  36. //结束事务,就是根据本地事务状态,执行提交、回滚或暂不处理事务

  37. try {

  38. this.endTransaction(sendResult, localTransactionState, localException);

  39. } catch (Exception var9) {

  40. this.log.warn("", var9);

  41. }

  42. TransactionSendResult transactionSendResult = new TransactionSendResult();

  43. transactionSendResult.setSendStatus(sendResult.getSendStatus());

  44. transactionSendResult.setMessageQueue(sendResult.getMessageQueue());

  45. transactionSendResult.setMsgId(sendResult.getMsgId());

  46. transactionSendResult.setQueueOffset(sendResult.getQueueOffset());

  47. transactionSendResult.setTransactionId(sendResult.getTransactionId());

  48. transactionSendResult.setLocalTransactionState(localTransactionState);

  49. return transactionSendResult;

  50. }

上面的代码,就是发送事务消息的过程。我们重点来看,如果事务消息发送成功,并且返回状态为SEND_OK,那么就去执行监听器中的executeLocalTransaction方法,这说明它们在一个事务中。

但是,在执行过程中,它手动捕获了 Throwable 异常。这就说明,即便执行本地事务失败,也不会触发回滚的。

至此,我们已经非常明确了,如果按照书里的流程来写代码,这块就会成为一个隐患点。

如果想规避这个问题,我们只能修改rocket-client中的代码,比如:

 
  1. try {

  2. //执行本地事务方法

  3. if (transactionListener != null) {

  4. this.log.debug("Used new transaction API");

  5. localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

  6. }

  7. } catch (Throwable var10) {

  8. this.log.info("executeLocalTransactionBranch exception", var10);

  9. this.log.info(msg.toString());

  10. localException = var10;

  11. throw new MQClientException(e.getMessage(),e);

  12. }

笔者通过修改源码,并测试了一下,通过这种手动抛出异常的方式也是可以的。这样的话如果执行本地事务的时候出错,也会回滚订单数据。

到这里,就能回答笔者本文2.4章节里的一个问题:

为什么在订单业务服务类中,需要有两个方法。一个用于向RocketMQ发送事务消息,一个用于真正的业务数据落库。

五、总结

本文重点阐述了基于RocketMQ来实现最终一致性的分布式事务案例。

另外,也分享了关于《RocketMQ技术内幕》一书中,分布式事务示例代码,可能出现的异常问题。关于这一点,也希望朋友们如果有不同看法,积极留言,共同交流。


作者:清幽之地
链接:https://juejin.im/post/5e737d155188254943200ed0
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

官网参考:

消息队列RocketMQ版的事务消息有什么特点_云消息队列 MQ-阿里云帮助中心

https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

史上最强Tomcat8性能优化

阿里巴巴为什么能抗住90秒100亿?--服务端高并发分布式架构演进之路

B2B电商平台--ChinaPay银联电子支付功能

学会Zookeeper分布式锁,让面试官对你刮目相看

SpringCloud电商秒杀微服务-Redisson分布式锁方案

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于RocketMQ分布式事务 - 完整示例 的相关文章

  • conda加速设置

    Conda作为使用最为便捷的python环境管理工具 可以协助我们很方便的下载安装第三方库 软件包等操作 但其在下载资源的过程中速度不言而喻 尤其是在更换国内源的情况下 下载速度没有实质性的改变是很令人头疼的一件事 Mamba 树眼镜蛇 能
  • (tensorflow学习)用Object Detection API实现摄像头实时物体检测

    对于物体识别 谷歌已经有训练好的模型供我们使用 图方便不想自己训练的可以直接使用 说实话 装这个tensorflow真心麻烦 我建议用anaconda环境搭建 还要注意装的话装1 几的版本就可 用gpu跑的话注意显卡型号和版本是否兼容 真是
  • 【C++】内存管理

    目录 一 C C 内存分布 二 C语言中动态内存管理方式 三 C 中动态内存管理 1 开辟空间 2 释放空间 四 operator new与operator delete函数 五 内存泄漏 1 什么是内存泄漏 2 如何避免内存泄漏 总结 一

随机推荐

  • Python的getattr方法

    getattr是Python中的内置函数 用于获取一个对象的属性值 这个函数是动态获取属性的一种方式 特别适用于你事先不知道要获取哪个属性 或者属性名是在运行时确定的情况 使用方法 getattr object name default o
  • 资产安全 错题点

    数据所有者 1 决定谁有权访问信息系统 2 对资产负有最终责任 PS 对资产负有最终责任的 高级管理层 数据所有者 首选管理层 3 行为规则 制定规则 以便用于主体的数据或信息的适当使用及保护 4 决定数据的级别 每年回顾确保数据分级的正确
  • 【国产化踩坑记】openEuler系统安装,nvidia驱动,cuda,anaconda安装步骤记录

    1 openEuler安装步骤 尝试安装了openEuler20 03和22 03两个版本 在摸索的过程中总结了一下步骤 以及相关问题的解决方案 进行简单记录 便于后续使用 1 openEuler20 03安装步骤 网络配置以及可视化操作界
  • Segmentation fault (core dumped) 错误的一种解决场景

    错误类型 Segmentation fault core dumped 产生原因 Segmentation fault 段错误 Core Dump 核心转储 是操作系统在进程收到某些信号而终止运行时 将此时进程地址空间的内容以及有关进程状态
  • Springboot+Axios双token解决token过期续签问题

    后端分离使用token进行登录验证时 由于token存在过期时间 每次token过期都需要用户重新登录的话 用户体验很不友好 假如token能跟session一样 如果用户持续在进行操作 就自动延长有效时间 就可以解决问题 但是 token
  • qt利用腾讯云服务器实现不同局域网的通信(tcp)

    网上大多数关于qt通信的文章都是同一局域网通信 这种根本没有达到自己想象中的那种通信的要求 不同局域网的通信 这里用到的方法是客户端发送消息给服务器 然后服务器再发送给另一个局域网的客户 首先我们需要购买一个腾讯云服务器 并在自己电脑登录腾
  • Python记11(网络传输大文件

    客户端 import socket tqdm os 传输数据分隔符 separator
  • log4j2入门(三) PatternLayout输出格式详解

    摘要 本节介绍Log4j的输出格式的详细说明 1 PatternLayout参数 charset 指定字符集 pattern 指定格式 alwaysWriteExceptions 默认为true 输出异常 header 可选项 包含在每个日
  • connect和bind

    UDP 考虑以下情形 我们使用UDP写一个echo程序 客户端模型 while fget sendto recvfrom 如果服务器进程没有启动会如何 通过截包发现服务器响应一个icmp port unreachable 不过这个ICMP错
  • java: javamail 1.6.2 Create Receive Email using jdk 19

    接收邮件 中文是乱码 未解决 param pop3Host pop 163 com param storeType pop3 param user geovindu 163 com param password geovindu autho
  • 安装DevEco Studio 3.0 Beta2

    引言 鸿蒙应用程序前端 北向开发 的开发环境是华为提供的HUAWEI DevEco Studio DevEco Studio支持Windows和macOS系统 本文记录了DevEco Studio 3 0 Beta2在Windows操作系统
  • 学习笔记 JavaScript ES6 NRM源切换

    NRM npm registry manager 镜像源管理工具 两种切换方式 一 终端里输入如下命令即可切换至淘宝镜像源 mac下测试通过 npm config set registry http registry npm taobao
  • krita windows编译源码

    Qt系列文章目录 文章目录 Qt系列文章目录 前言 一 krita 二 krita源码编译 1 Windows下编译 1 编译准备 2 相关命令 使用CMake编译krita 重新编译 使用CMkae bash find package Z
  • 06C++11多线程编程之lock_guard类模板

    06C 11多线程编程之lock guard类模板 1 lock guard概念 1 lock guard是一个类模板 它是mutex的进化版 自动lock 和unlock 类似独占型智能指针unique ptr 是一个保姆 在lock g
  • QT解析XML的三种方式

    1 QT QXmlStreamReader用法小结 解析常用到的函数含义 1 导入一个xml文件或字符串的方式 方式一 QXmlStreamReader reader sXMLContent 字符串的xml 方式二 QXmlStreamRe
  • 自然连接(NATURAL JOIN)

    自然连接 NATURAL JOIN 是一种特殊的等值连接 将表中具有相同名称的列自动进行匹配 1 自然连接不必指定任何连接条件 SQL gt desc emp Name Null Type EMPNO NOT NULL NUMBER 4 E
  • 城市配电网恢复方法

    城市配电网恢复方法是指在大停电事故后 配网与主网断开连接 只能协同利用配网中的分布式电源进行恢复供电的方法 该方法需要考虑多时段 多类型负荷的恢复需求 以及电网 水网 气网的运行约束和发电资源的有限能量约束 计及关键负荷功能恢复需求的多时段
  • 【考研复习:数据结构】查找(不含代码篇)

    前言 1 此篇是基于博主对严蔚敏版教材 数据结构 王道书 数据结构 和在网上相关资料的查询 对第七章 查找 的学习总结 2 查找这一章含代码 C 会写在另一篇 写好后再放链接 3 博主比较喜欢用表格使思路稍微清晰一些 还有一些博主自己怕记乱
  • 找二叉树的中序后继

    设计一个算法 找出二叉搜索树中指定节点的 下一个 节点 也即中序后继 如果指定节点没有对应的 下一个 节点 则返回null 方法一 Definition for a binary tree node public class TreeNod
  • 基于RocketMQ分布式事务 - 完整示例

    前言 之前我们说到 分布式事务是一个复杂的技术问题 没有通用的解决方案 也缺乏简单高效的手段 不过 如果我们的系统不追求强一致性 那么最常用的还是最终一致性方案 今天 我们就基于 RocketMQ来实现消息最终一致性方案的分布式事务 本文代