RocketMQ消费者设置了instanceName属性后消息竟不翼而飞

2023-10-27

背景

RocketMQ使用过程中为了快速搭建消费服务,于是在同一个机器集群消费的方式起了多个消费者实例,结果发现部分消息没被消费到!本文是对问题产生原因的跟踪和分析,下面会将项目中遇到的问题简化成官方demo来说明。

问题重现

生产者代码

Producer.java

/*
         * Instantiate with a producer group name.
         * 默认分配4个消息队列
         */
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");

        producer.setNamesrvAddr("localhost:9876");
        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

启动一个producer实例发送10条消息到4个消息队列。

消息发送情况:

消息发送结果: queueId=2,消息内容: Hello RocketMQ 0
消息发送结果: queueId=3,消息内容: Hello RocketMQ 1
消息发送结果: queueId=0,消息内容: Hello RocketMQ 2
消息发送结果: queueId=1,消息内容: Hello RocketMQ 3
消息发送结果: queueId=2,消息内容: Hello RocketMQ 4
消息发送结果: queueId=3,消息内容: Hello RocketMQ 5
消息发送结果: queueId=0,消息内容: Hello RocketMQ 6
消息发送结果: queueId=1,消息内容: Hello RocketMQ 7
消息发送结果: queueId=2,消息内容: Hello RocketMQ 8
消息发送结果: queueId=3,消息内容: Hello RocketMQ 9

从发送结果可以看出消息发送的队列分配情况如下所示:
在这里插入图片描述

消费者代码

Consumer.java

/*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");

        consumer.setNamesrvAddr("localhost:9876");
        
        //自定义instanceName
        consumer.setInstanceName("XUJIAN_MACBOOK");
        
        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                System.out.printf("msgBody: %s %n",new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

本地启动两个消费者实例,即consumer启动两次,设置为集群消费模式,且两个消费者实例属于同一个消费者组

紊乱的消费结果

consumer1

ConsumeMessageThread_11 接收到新消息: queueId=0,消息内容: Hello RocketMQ 2
ConsumeMessageThread_14 接收到新消息: queueId=1,消息内容: Hello RocketMQ 6
ConsumeMessageThread_15 接收到新消息: queueId=0,消息内容: Hello RocketMQ 3
ConsumeMessageThread_16 接收到新消息: queueId=1,消息内容: Hello RocketMQ 7

consumer2

ConsumeMessageThread_6 接收到新消息: queueId=0,消息内容: Hello RocketMQ 2
ConsumeMessageThread_7 接收到新消息: queueId=1,消息内容: Hello RocketMQ 6
ConsumeMessageThread_8 接收到新消息: queueId=0,消息内容: Hello RocketMQ 3
ConsumeMessageThread_9 接收到新消息: queueId=1,消息内容: Hello RocketMQ 7

从消费结果可以看出消息消费的队列分配情况如下所示:
在这里插入图片描述

两个消费者消费了相同队列的相同消息,且部分消息没被消费到。这和预期的集群消费模式下消费者组内的消费者均分消息队列不符!

原因分析

当发现消费者消费异常时,首先应该排查消费负载均衡是否正常。

消费负载均衡

集群消费的时候会根据统一消费者组内消费者的数量队列数量以及不同的策略来为每个消费者分配要消费的消息。

消费者的默认队列分配策略是“均分”,源码如下:

/**
     * Constructor specifying consumer group.
     *
     * @param consumerGroup Consumer group.
     */
    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }

其中AllocateMessageQueueAveragely就是平均分配策略,其他的还有随机等,均实现了AllocateMessageQueueStrategy接口。

RebalanceImpl.java

该类就是消息消费均衡类。

相关核心源码如下:

public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                   //根据topic进行reblance
                   this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    ...
    //获取分配的结果
    allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
    ...
}

又回到AllocateMessageQueueAveragely.java,上文提到这个类的策略是均分,那就来看看他是怎么做的。源码如下:
AllocateMessageQueueAveragely.java

public List<MessageQueue> allocate(String consumerGroup/*消费者组*/, String currentCID/*clientId*/, List<MessageQueue> mqAll/*消息队列集合*/,
        List<String> cidAll/*消费者组里面的所有消费者的clientId*/) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        //如果消费者组里的消费者不包含当前这个消费者,直接返回
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        //当前消费者在消费者集合里面的位置
        int index = cidAll.indexOf(currentCID);
        //队列数对消费者数取模
        int mod = mqAll.size() % cidAll.size();
        //求当前消费者应该消费几个队列
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        //求当前消费者应该从哪个队列开始消费消息
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        //将当前消费者应该消费的队列一个一个放进返回结果列表
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

可以发现消费者消费哪些队列是由clientId决定的。

所以当两个消费者的clientId一样时,调用indexOf方法返回的是一样的结果,所以他们消费的队列是一样的。如上面的例子,总共有4个队列,2个消费者,所以两个消费者只消费了同样的两个队列:queueId=0、queueId=1

clientId怎么生成

上面说了消费队列负载均衡的结果和clientId有关,那clientId是怎么生成的?

构建clientId的源码如下:

    /**
     * clientId格式:ip+@+instanceName[+@unitName],通常你会看到形如127.0.0.1@32531这样的clientId
     * @return
     */
    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

clientId用来唯一标识一个MQClientInstance

可见clientId是根据instanceName属性、ipunitName(可选)生成的。

为什么会生成相同的clientId

根据上面clientId的生成规则,两个消费者都在本地启动,意味着有相同的ipunitName没有设置。

正巧两个消费者设置了相同的instanceName,那生成的clientId必然相同!,这就是问题的关键所在

解决方案

经过上面分析知道了是clientId相同是问题所在,那解决方案就是让两个消费者的clientId不相同。

根据
在这里插入图片描述
那最简单的解决方案有如下三种:

方案一:不设置instanceName属性

因为集群模式下instanceName默认值为进程id,源码如下:

    /**
     * 如果是集群消费模式,如果instanceName是默认值(即没有自定义该属性)则通过进程id来替换该属性
     */
    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }

两个消费者的进程id肯定是不同的。

方案二:两个消费者设置不同的instanceName属性

这个很容易能想到,不必多说。

方案三:两个消费者在不同的机器上启动

在这里插入图片描述
在不同机器上启动意味着ip是不一样的,也可以使生成的clientId不同。

正常的消费结果

通过上述解决方案,最终得到了正确的消费结果。

consumer1:

ConsumeMessageThread_16 接收到新消息: queueId=0,消息内容: Hello RocketMQ 2
ConsumeMessageThread_17 接收到新消息: queueId=1,消息内容: Hello RocketMQ 3
ConsumeMessageThread_18 接收到新消息: queueId=0,消息内容: Hello RocketMQ 6
ConsumeMessageThread_19 接收到新消息: queueId=1,消息内容: Hello RocketMQ 7 

consumer2:

ConsumeMessageThread_6 接收到新消息: queueId=2,消息内容: Hello RocketMQ 0
ConsumeMessageThread_7 接收到新消息: queueId=3,消息内容: Hello RocketMQ 1
ConsumeMessageThread_8 接收到新消息: queueId=2,消息内容: Hello RocketMQ 4
ConsumeMessageThread_9 接收到新消息: queueId=3,消息内容: Hello RocketMQ 5
ConsumeMessageThread_10 接收到新消息: queueId=2,消息内容: Hello RocketMQ 8
ConsumeMessageThread_11 接收到新消息: queueId=3,消息内容: Hello RocketMQ 9

10条消息被两个消费者消费完成,从消费结果可以看出消息消费的队列分配情况如下所示:
在这里插入图片描述

队列被两个消费者平均分配,但是注意,队列均分不代表消息均分!

总结

通过这次的问题跟踪排查和解决,越来越意识到对一个中间件原理甚至源码熟悉的重要性。当了解了其整体架构、运作原理以及模块源码以后就能够很快判断出大概是哪里出了问题,这最终也会沉淀为我们的个人经验。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ消费者设置了instanceName属性后消息竟不翼而飞 的相关文章

随机推荐

  • SpringBoot - 打包,war包,jar包

    一 war包 1 创建一个springboot的web应用 在src目录下创建一个 webapp目录 我们选择 file gt Project Structure gt 选择模块 选择 Web Resource Directories 新建
  • LVGL(二) SquareLine_Studio1.0.5 可视化编程环境搭建

    一 下载 SquareLine Studio1 0 5 官网链接 https squareline io CS sanDN链接 19条消息 SquareLine Studio Setup1 0 5免积分下载 智能家居文档类资源 CSDN文库
  • [antdv: FormModel] model is required for resetFields to work

    今天在用Vue ant的UI框架进行表单绑定的时候出现报错 Warning antdv FormModel model is required for resetFields to work 通过不断测试才发现是因为没有在表头使用 mode
  • QT ----Canvas绘图

    在使用qtwidget进行界面设计的时候 可以进直接使用gui模块内的控件或者使用QPainter自行绘制组件 但是在qml中QT只给我们提供了一个形状组件 rectangle 可以通过设置它的radius构成圆角矩形 但是 如果我们需要进
  • Tauri 打包

    1 第一次打包运行命令 npm run tauri build 2 可能会出现下面问题 我们需要在tauri conf json里面查找identifier这个名称 原来是com tauri dev 随便改下名字 我这里改成build了 3
  • python精彩编程200例-200G的Python初高级教程+项目实战案例源码,让你做有钱途的人才...

    2018年1月16日上午 教育部正式将人工智能 物联网 大数据处理正式划入高中新课标 这就意味着现在的学生16岁就要开始学习编程了 据统计 在所有专业级别的 39000 名开发人员中 有超过四分之一的开发人员在他们 16 岁之前就写了第一个
  • 面试官: Async是如何被JavaScript实现的

    太久没和大家见面了 因为最近业务上接了新的项目所以写文的时间被严重挤压 这篇 Async 是如何被实现的 其实断断续续已经在草稿箱里躺了很久了 终于在一个夜黑风高的周六晚上可以给他画上一个句号 引言 无论是面试过程还是日常业务开发 相信大多
  • Python 中常见的魔法方法

    什么是Python魔法方法 魔法方法是在Python的类中被双下划线前后包围的方法 如常见的 init new del 等 这些方法在类或对象进行特定的操作时会自动被调用 我们可以使用或重写这些魔法方法 给自定义的类添加各种特殊的功能来满足
  • 内网穿透代理(NPS)搭建以及使用

    与公众号同步更新公众号文章链接 内网穿透代理 NPS 搭建以及使用 部署前提需要一台公网服务器 各大云piao 获取到服务器后需要放行服务器端口 建议所有端口 ALL 新建一个NPS文件夹 mkdir NPS 进入NPS文件夹 cd NPS
  • Go语言面试题--基础语法(11)

    文章目录 1 定义一个包内 全局 字符串变量 下面语法正确的是 2 下面这段代码输出什么 3 下面这段代码输出什么 1 定义一个包内 全局 字符串变量 下面语法正确的是 A var str string B str C str D var
  • 彻底搞懂Vue中的Mixin混入(保姆级教程)

    彻底搞懂Vue中的Mixin混入 保姆级教程
  • 【IC设计】EDA palyground使用

    有时候我们在外地无法使用vivado等工具来进行Verilog编程 可以使用这个在线网站www edaplayground com 这个笔记记录一些需要注意的点 它会自动帮我们建立一个testbench sv 里面写入testbench 需
  • HTML5-画布使用教程

    1 简介画面的基本使用教程 CSS绘图和数据存储原理 橘猫吃不胖 的博客 CSDN博客
  • Python 读取txt文件每一行数据生成列表

    本意是将数据 改为如下形式 push lea push mov call mov mov pop retn mov jmp push mov mov call test jz push call add mov pop retn mov m
  • .PersistenceException: ### Error querying database.Cause: java.lang.NullPointerException

    错误 org apache ibatis io ResolverUtil Checking to see if class com wei mapper UserMapper matches criteria is assignable t
  • c++ protobuf 可能会遇到的坑

    1 发现存在内存泄露 程序退出时记得调用 google protobuf ShutdownProtobufLibrary 这里一定是在程序退出时调用 如果调用后又使用了 protobuf 会出现异常 因为protobuf 中使用构造 会有创
  • Mysql undo log

    一 基本概念 undo log有两个作用 1 为事务提供回滚 2 多版本并发控制 MVCC undo log和redo log记录物理日志不一样 它是逻辑日志 可以认为 当delete操作时 undo log记录的是insert记录 反之亦
  • 【Leetcode】1684. Count the Number of Consistent Strings

    题目地址 https leetcode com problems count the number of consistent strings 给定一个字符串 a a a和一个字符串数组 A A A 问
  • 单链表学习笔记(C语言)

    单链表学习笔记 C语言 一 说明 1 链表 所谓链表 就是用一组任意的存储单元存储线性表元素的一种数据结构 2 结构 链表的每个数据的存储都由两部分组成 1 数据元素本身 其所在的区域称为数据域 2 指向直接后继元素的指针 所在的区域称为指
  • RocketMQ消费者设置了instanceName属性后消息竟不翼而飞

    文章目录 背景 问题重现 生产者代码 消费者代码 紊乱的消费结果 原因分析 消费负载均衡 clientId怎么生成 为什么会生成相同的clientId 解决方案 方案一 不设置instanceName属性 方案二 两个消费者设置不同的ins