五张图带你理解 RocketMQ 顺序消息实现机制

2023-10-27

大家好,我是君哥。今天聊一聊 RocketMQ 的顺序消息实现机制。

在有些场景下,使用 MQ 需要保证消息的顺序性,比如在电商系统中,用户提交订单、支付订单、订单出库这 3 个消息应该保证顺序性,如下图:

对于 RocketMQ 来说,主要是通过 Producer 和 Consumer 来保证 消 息顺序的。

1、Producer

下面代码是 Producer 发送顺序消息的官方示例:

public static void main(String[] args) throws UnsupportedEncodingException {
 try {
  DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  producer.start();
  String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  for (int i = 0; i < 100; i++) {
   int orderId = i % 10;
   Message msg =
    new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
     Integer id = (Integer) arg;
     int index = id % mqs.size();
     return mqs.get(index);
    }
   }, orderId);

   System.out.printf("%s%n", sendResult);
  }
  producer.shutdown();
 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  e.printStackTrace();
 }
}

跟发送并发消息不一样的是,发送消息时传入了 MessageQueueSelector,这里可以指定消息发送到固定的 MessageQueue。

注意:上面的代码把 orderId 相同的消息都会发送到同一个 MessageQueue,这样同一个 orderId 的消息是有序的,这也叫做局部有序。对应的另一种是全局有序,这需要把所有的消息都发到同一个 MessageQueue。

下面再来看一下发送的代码:

private SendResult sendSelectImpl(
 Message msg,
 MessageQueueSelector selector,
 Object arg,
 final CommunicationMode communicationMode,
 final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 //省略部分逻辑
 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 if (topicPublishInfo != null && topicPublishInfo.ok()) {
  MessageQueue mq = null;
  try {
   List<MessageQueue> messageQueueList =
    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
   Message userMessage = MessageAccessor.cloneMessage(msg);
   String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
   userMessage.setTopic(userTopic);

   mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
  } catch (Throwable e) {
   throw new MQClientException("select message queue threw exception.", e);
  }
  //省略部分逻辑
  if (mq != null) {
   return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
  } else {
   throw new MQClientException("select message queue return null.", null);
  }
 }
    //省略部分逻辑
}

可以看到,在发送的时候,使用 MessageQueueSelector 选择一个 MessageQueue,然后发送消息到这个 MessageQueue。对于并发消息,这里不传 MessageQueueSelector,如果发送方法没有指定 MessageQueue,就会按照默认的策略选择一个。

2、Consumer

以 RocketMQ 推模式为例,消费者会注册一个监听器,进行消息的拉取和消费处理,下面的 UML 类图显示了调用关系:

上图中包含了对顺序消息和对并发消息的处理。其中 MessageListenerOrderly 和 ConsumeMessageOrderlyService 对顺序消息进行处理。跟并发消息不一样的是,顺序消息定义了一个 MessageQueueLock 类,这个类保存了每个 MessageQueue 对应的锁,代码如下:

private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

下面代码是顺序消费的官方示例:

public static void main(String[] args) throws MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 consumer.subscribe("TopicTest", "TagA || TagC || TagD");
 consumer.registerMessageListener(new MessageListenerOrderly() {
  AtomicLong consumeTimes = new AtomicLong(0);
  @Override
  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
   context.setAutoCommit(true);
   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
   this.consumeTimes.incrementAndGet();
   if ((this.consumeTimes.get() % 2) == 0) {
    return ConsumeOrderlyStatus.SUCCESS;
   } else if ((this.consumeTimes.get() % 5) == 0) {
    context.setSuspendCurrentQueueTimeMillis(3000);
    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
   }
   return ConsumeOrderlyStatus.SUCCESS;
  }
 });
 consumer.start();
 System.out.printf("Consumer Started.%n");
}

下面看一下顺序消息的消费端处理逻辑。

(1)注册监听

上面的代码定义了顺序消息监听器 MessageListenerOrderly,并且注册到 DefaultMQPushConsumer,这个注册同时也注册到了 DefaultMQPushConsumerImpl。

(2)PushConsumer 初始化

在 DefaultMQPushConsumerImpl 类初始化的时候,会判断注册的 MessageListener 是不是 MessageListenerOrderly,如果是,就把 consumeOrderly 变量设置为 true,以此来标记是顺序消息拉取还是并发消息拉取。然后把 ConsumeMessageService 初始化为 ConsumeMessageOrderlyService。

(3)锁定 mq

要保证消息的顺序性,就需要保证同一个 MessageQueue 只能被同一个 Consumer 消费。

ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,周期性(默认 20s)地向 Broker 发送锁定消息(请求类型 LOCK_BATCH_MQ),Broker 收到后,就会把 MessageQueue、group 和 clientId 进行绑定,这样其他客户端就不能从这个 MessageQueue 拉取消息。

注意:Broker 的锁定是有过期时间的,默认 60s,可以配置,锁定过期后,有可能被其他 Consumer 进行消费。

Broker 端锁结构如下图:

(4)拉取消息

消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法。这里拉取消息的逻辑跟并发消息逻辑是一样的。

拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageOrderlyService 的 submitConsumeRequest 方法,里面用线程池提交了 ConsumeRequest 线程。

PullCallback pullCallback = new PullCallback() {
 @Override
 public void onSuccess(PullResult pullResult) {
  if (pullResult != null) {
   pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);
   switch (pullResult.getPullStatus()) {
    case FOUND:
     //省略
     if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
     } else {
      //省略
      boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
      DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
       pullResult.getMsgFoundList(),
       processQueue,
       pullRequest.getMessageQueue(),
       dispatchToConsume);
                        //省略
     }
     //省略
     break;
    //省略
   }
  }
 }
    //省略
};

上面拉取到消息后,先把消息放到了 ProcessQueue,然后调用了 submitConsumeRequest 方法。跟并发消息处理方式不同的是,submitConsumeRequest 方法并没有处理拉取到的消息,而真正处理的时候是从 ProcessQueue 获取。

(5)处理消息

处理消息的逻辑在 ConsumeMessageOrderlyService 的内部类 ConsumeRequest,这是一个线程类,run 方法如下:

public void run() {
 //省略部分逻辑
 //1.获取到 MessageQueueLock 对应的锁
 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
 synchronized (objLock) {
  if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
   || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
   final long beginTime = System.currentTimeMillis();
   for (boolean continueConsume = true; continueConsume; ) {
    //省略延后执行的逻辑
    final int consumeBatchSize =
     ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                //2.从 processQueue 拉取消息
    List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
    if (!msgs.isEmpty()) {
     final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

     ConsumeOrderlyStatus status = null;
                    //省略部分逻辑
     boolean hasException = false;
     try {
         //3.获取处理锁
      this.processQueue.getConsumeLock().lock();
      //4.执行消费处理逻辑
      status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
     } catch (Throwable e) {
      log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
       RemotingHelper.exceptionSimpleDesc(e),
       ConsumeMessageOrderlyService.this.consumerGroup,
       msgs,
       messageQueue), e);
      hasException = true;
     } finally {
         //5.释放处理锁
      this.processQueue.getConsumeLock().unlock();
     }
     //省略部分逻辑
     continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
    } else {
     continueConsume = false;
    }
   }
  } else {
   //省略部分逻辑
   ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
  }
 }
}

上面的代码总结一下,Consumer 消费消息的逻辑如下:

  1. 对 MessageQueueLock 进行加锁,这样就保证只有一个线程在处理当前 MessageQueue。
  2. 从 ProcessQueue 拉取一批消息。
  3. 获取 ProcessQueue 锁,这样保证了只有当前线程可以进行消息处理,同时也可以防止 Rebalance 线程把当前处理的 MessageQueue 移除掉。
  4. 执行消费处理逻辑。
  5. 释放 ProcessQueue 处理锁;6.processConsumeResult 方法更新消息偏移量。

注意:ProcessQueue 中的锁是 ReentrantLock。

3、重试

跟并发消息不一样的是,顺序消息消费失败后并不会把消息发送到 Broker,而是直接在 Consumer 端进行重试,如果重试次数超过了最大重试次数(16 次),则发送到 Broker,Broker 则将消息推入死信队列。如下图:

4、总结

RocketMQ 顺序消息的原理是在 Producer 端把一批需要保证顺序的消息发送到同一个 MessageQueue,Consumer 端则通过加锁的机制来保证消息消费的顺序性,Broker 端通过对 MessageQueue 进行加锁,保证同一个 MessageQueue 只能被同一个 Consumer 进行消费。

根据实现原理可以看到,RocketMQ 的顺序消息可能存在两个坑:

  1. 有顺序性的消息需要发送到同一个 MessageQueue,可能导致单个 MessageQueue 消息量很大,而 Consumer 端消费的时候只能单线程消费,很可能导致当前 MessageQueue 消息积压。
  2. 如果顺序消息 MessageQueue 所在的 broker 挂了,这时 Producer 只能把消息发送到其他 Broker 的 MessageQueue 上,而如果新的 MessageQueue 被其他 Consumer 消费,这样两个 Consumer 消费的消息就不能保证顺序性了。如下图:

Broker1 发生故障,把订单出库的消息发送到了 Broker2,由 Consumer2 来进行消费,消息顺序很可能会错乱。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

五张图带你理解 RocketMQ 顺序消息实现机制 的相关文章

随机推荐

  • 什么是JavaBean、bean? 什么是POJO、PO、DTO、VO、BO ? 什么是EJB、EntityBean?

    前言 在Java开发中经常遇到这些概念问题 有的可能理解混淆 有的可能理解不到位 特此花了很多时间理顺了这些概念 不过有些概念实际开发中并没有使用到 可能理解还不够准确 只能靠后续不断纠正了 1 什么是POJO POJO Plain Old
  • RPC 技术及其框架 Sekiro 在爬虫逆向中的应用,加密数据一把梭

    文章目录 什么是 RPC JSRPC Sekiro 优缺点 什么是 RPC RPC 英文 RangPaCong 中文让爬虫 旨在为爬虫开路 秒杀一切 让爬虫畅通无阻 开个玩笑 实际上 RPC 为远程过程调用 全称 Remote Proced
  • LeetCode——036

    Valid Sudoku My Submissions QuestionEditorial Solution Total Accepted 71051 Total Submissions 233215 Difficulty Easy Det
  • AI 大行其道,你准备好了吗?—谨送给徘徊于转行 AI 的程序员

    前言 近年来 随着 Google 的 AlphaGo 打败韩国围棋棋手李世乭之后 机器学习尤其是深度学习的热潮席卷了整个 IT 界 所有的互联网公司 尤其是 Google 微软 百度 腾讯等巨头 无不在布局人工智能技术和市场 百度 腾讯 阿
  • 学习Javascript闭包(Closure)[非常棒的文章]

    作者 阮一峰 日期 2009年8月30日 闭包 closure 是Javascript语言的一个难点 也是它的特色 很多高级应用都要依靠闭包实现 下面就是我的学习笔记 对于Javascript初学者应该是很有用的 一 变量的作用域 要理解闭
  • 关于论青少年尽早学少儿编程之说

    关于论青少年尽早学少儿编程之说 正如一本书中所描述的一句话 尽早学习编程 是孩子为未来做好准备必不可少的一步 看完这句话之后 给我们的直观印象可能就是 不教孩子学习编程在某种程度上等于不教他们读书写字 这种说法明显是片面的 编程 读书写字
  • 若依系统注册功能

    加油 三步实现注册 前端 后端 分配角色 总结 前端 login vue中打开注册开关 后端 打开数据库sys config表 开启注册功能 分配角色 在SysUserMapper中添加方法 实现方法 在SysUserServiceImpl
  • dialog中二维码显示问题

    由于dialog加载过程会耗费一定时间 因此在dialog中直接调用会导致在一次打开的dialog无法加载二维码 在dialog标签中加入 opened ShowQRCode 属性 opened是dialog动画打开完毕之后的回调 当页面加
  • 计算机网络层提供的面向连接服务还是无连接服务讨论与思考

    概要 在计算机网络领域 网络层应该向运输层提供怎样的服务 面向连接 还是 无连接 曾引起了长期的争论 争论焦点的实质就是 在计算机通信中 可靠交付应当由谁来负责 是网络还是端系统 介绍 有些人认为应当借助于电信网的成功经验 让网络负责可靠交
  • 计算机主机名与用户名区别

    一 主机名概念 主机名就是计算机的名字 计算机名 网上邻居就是根据主机名来识别的 这个名字可以随时更改 从我的电脑属性的计算机名就可更改 用户登陆时候用的是操作系统的个人用户帐号 这个也可以更改 从控制面板的用户界面里改就可以了 这个用户名
  • 1. Inna and Pink Pony

    1 Inna and Pink Pony 首先找出四个边界点 但要注意当横纵坐标等于边界横纵坐标时 需考虑是否会出界 满足以上条件时 考虑横纵坐标移动次数其和为偶数时便可以完成移动 因为正负抵消原则 话不多说 直接上Python代码 n m
  • 解决 CommandNotFoundError: Your shell has not been properly configured to use ‘conda activate’问题

    针对使用conda进入虚拟环境时遇到的问题 CommandNotFoundError Your shell has not been properly configured to use conda activate 解决方法 win r
  • 解决Android中使用RecyclerView滑动时底部item显示不全的问题

    感觉这个bug是不是因人而异啊 找了很多文章都没能解决我的问题 包括在RecyclerView上在嵌套上一层RelativeLayout 添加属性android descendantFocusability blocksDescendant
  • 解决“L6200E Symbol xx defined (by xx.o and xx.o)”重复定义问题

    今天来分享一个关于自己之前遇到的一个问题 就是关于重复定义会造成的一个错误 错误提示为 OBJ LCD axf Error L6200E Symbol ascii 1206 multiply defined by lcd user o an
  • C语言每日一题:7.寻找数组中心下标。

    思路一 暴力求解 1 定义一个ps作为中间下标去记录下标值 2 循环下标ps从头到位 定义四个变量分别是left sum left right sum right 3 初始化left ps 1和right ps 1 当ps0 gt 就让su
  • etcd学习和实战:4、Java使用etcd实现服务发现和管理

    etcd学习和实战 4 Java使用etcd实现服务发现和管理 文章目录 etcd学习和实战 4 Java使用etcd实现服务发现和管理 1 前言 2 代码 2 1 服务注册 2 2 服务发现 2 3 运行结果 2 4 问题 3 最后 1
  • 关于SVM的一点笔记

    关于SVM的一点笔记 一 简单了解 1 感知机 perceptron 感知机是一种类似于生物中神经细胞功能的人工神经元 它可以把一个或者多个输入 x 1 x 1 x1 x
  • flask最基础的增删改查实现步骤及代码

    分类序列化器 写入要序列化的字段 user info id fields Integer name fields String 商品序列化器 写入要序列化的字段 goods info id fields Integer name field
  • Spring系列面试题(Spring、SpringMvc、SpringBoot)

    一 springboot自动配置原理 自动装配 简单来说就是自动把第三方组件的Bean装载到Spring IOC器里面 不需要开发人员再去写Bean的配置 在Spring Boot应用里面 只需要在启动类加上 SpringBootAppli
  • 五张图带你理解 RocketMQ 顺序消息实现机制

    大家好 我是君哥 今天聊一聊 RocketMQ 的顺序消息实现机制 在有些场景下 使用 MQ 需要保证消息的顺序性 比如在电商系统中 用户提交订单 支付订单 订单出库这 3 个消息应该保证顺序性 如下图 对于 RocketMQ 来说 主要是