RocketMQ同一Topic、消费组创建多个消费者失败问题

2023-05-16

文章目录

    • 业务场景
    • 问题复现
    • 解决方式
    • 问题跟踪

业务场景

rocketmq建议一个服务对应一个topic,但是一个服务下会有多个不同的业务消息,同时rocketmq建议不同的业务消息对应不同的tag,当SpringBoot整合RocketMQ时,设置多个消费者发生报错

问题复现

RocketMQ创建多个消费者(同一个消费组)消费同一Topic的不同tag的消息发生报错

2021-05-27 11:10:12.862 ERROR 7636 --- [           main] o.a.r.s.a.ListenerContainerConfiguration : Started container failed. DefaultRocketMQListenerContainer{consumerGroup='MyConsumerGroup', nameServer='10.90.175.160:9876', topic='TestTopic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='tag0', messageModel=CLUSTERING}

java.lang.IllegalStateException: Failed to start RocketMQ push consumer
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:281) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.registerContainer(ListenerContainerConfiguration.java:120) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	at java.util.HashMap.forEach(HashMap.java:1289) ~[na:1.8.0_222]
	at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.afterSingletonsInstantiated(ListenerContainerConfiguration.java:79) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:862) ~[spring-beans-5.1.8.RELEASE.jar:5.1.8.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:865) ~[spring-context-5.1.0.RELEASE.jar:5.1.0.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:548) ~[spring-context-5.1.0.RELEASE.jar:5.1.0.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at com.roy.rocketmq.RocketMQSBApplication.main(RocketMQSBApplication.java:16) ~[classes/:na]
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[MyConsumerGroup] has been created before, specify another name please.
See http://rocketmq.apache.org/docs/faq/ for further details.
	at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:634) ~[rocketmq-client-4.7.1.jar:4.7.1]
	at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:698) ~[rocketmq-client-4.7.1.jar:4.7.1]
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:279) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	... 13 common frames omitted

2021-05-27 11:10:12.866  INFO 7636 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2021-05-27 11:10:12.881  INFO 7636 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-27 11:10:12.892 ERROR 7636 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.RuntimeException: java.lang.IllegalStateException: Failed to start RocketMQ push consumer
	at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.registerContainer(ListenerContainerConfiguration.java:123) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	at java.util.HashMap.forEach(HashMap.java:1289) ~[na:1.8.0_222]
	at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.afterSingletonsInstantiated(ListenerContainerConfiguration.java:79) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:862) ~[spring-beans-5.1.8.RELEASE.jar:5.1.8.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:865) ~[spring-context-5.1.0.RELEASE.jar:5.1.0.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:548) ~[spring-context-5.1.0.RELEASE.jar:5.1.0.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
	at com.roy.rocketmq.RocketMQSBApplication.main(RocketMQSBApplication.java:16) [classes/:na]
Caused by: java.lang.IllegalStateException: Failed to start RocketMQ push consumer
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:281) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.registerContainer(ListenerContainerConfiguration.java:120) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	... 12 common frames omitted
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[MyConsumerGroup] has been created before, specify another name please.
See http://rocketmq.apache.org/docs/faq/ for further details.
	at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:634) ~[rocketmq-client-4.7.1.jar:4.7.1]
	at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:698) ~[rocketmq-client-4.7.1.jar:4.7.1]
	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:279) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
	... 13 common frames omitted

解决方式

实现RocketMQPushConsumerLifecycleListener接口,重写prepareStart,consumer.setInstanceName(“testTopic-tag0”);设置唯一标识的instanceName即可

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup",
        topic = "TestTopic",
        consumeMode= ConsumeMode.CONCURRENTLY,
        selectorType = SelectorType.TAG,
        selectorExpression = "tag0")
public class Tag0Consumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener  {
    @Override
    public void onMessage(String message) {
        System.out.println("Received tag0 message : "+ message);
    }


    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        //consumer.setInstanceName("testTopic-tag0");
    }
}
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup",
        topic = "TestTopic",
        consumeMode= ConsumeMode.CONCURRENTLY,
        selectorType = SelectorType.TAG,
        selectorExpression = "tag1")
public class Tag0Consumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener  {
    @Override
    public void onMessage(String message) {
        System.out.println("Received tag1 message : "+ message);
    }


    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setInstanceName("testTopic-tag1");
    }
}

问题跟踪

发现如果不给consumer指定instanceName,rocketmq就会给此consumer设置一个默认的instanceName,如果有多个消费者都在同一个消费组里,并且不指定instanceName,通过this.consumerTable.putIfAbsent(group, consumer)拿到的消费者对象就会是同一个,就会报错

​ log.warn(“the consumer group[” + group + “] exist already.”);

​ throw new MQClientException(“The consumer group[” + this.defaultMQPushConsumer.getConsumerGroup()+ “] has been created before, specify another name please.” + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);

代码片段

DefaultMQPushConsumerImplboolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                null);
}
MQClientInstancepublic boolean registerConsumer(final String group, final MQConsumerInner consumer) {
    if (null == group || null == consumer) {
    return false;
    }

    MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
    if (prev != null) {
    log.warn("the consumer group[" + group + "] exist already.");
    return false;
    }

    return true;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ同一Topic、消费组创建多个消费者失败问题 的相关文章

  • 消息队列 RocketMQ:(九)消息重试

    文章目录 消息队列 RocketMQ 一 概述 消息队列 RocketMQ 二 系统架构 消息队列 RocketMQ 三 发送普通消息 三种方式 消息队列 RocketMQ 四 顺序消息 消息队列 RocketMQ 五 延时消息 消息队列
  • RocketMQ 安装

    镜像方式安装 首先再把上一接中提到的 RocketMQ 部署架构图看一下 从图中可以看出 RocketMQ的服务端分为两块 Name Server 和 Broker Name Server 是一个几乎无状态节点 可集群部署 在消息队列Roc
  • Centos7下基于jdk11 安装RocketMQ

    1 简介 RocketMQ是阿里巴巴中间件团队自研的一款高性能 高吞吐量 低延迟 高可用 高可靠 具备金融级稳定性 的分布式消息中间件 开源后并于2016年捐赠给Apache社区孵化 目前已经成为了 Apache顶级项目 当前在国内被广泛的
  • RocketMQ学习笔记(实操篇)

    目录 基本操作 启动 测试 双主双从集群搭建 总体架构 工作流程 服务器环境 Host添加信息 防火墙配置 环境变量配置 创建消息存储路径 broker配置文件 修改启动脚本文件 服务启动 查看进程状态 查看日志 mqadmin管理工具 使
  • 面试题篇-09-RocketMQ相关面试题

    文章目录 1 RocketMQ如何保证消息有序消费 2 RocketMQ和Kafka 快的飞起 底层存储有什么不同 2 1 什么是 零拷贝 2 1 1 传统IO数据读写 2 1 2 mmap优化 3次切换 3次拷贝 2 1 3 SendFi
  • Docker实战:docker compose 搭建Rocketmq

    1 配置文件准备 1 1 新建目录 home docker data rocketmq conf mkdir home docker data rocketmq conf 1 2 在上面目录下新建文件broker conf文件 内容如下 b
  • rockemq创建topic

    sh mqadmin updateTopic n sms pro 007 9876 sms pro 008 9876 c DefaultCluster t smsFrontSmsMq 10 w 4 r 4
  • 漏洞复现-CVE-2023-33246 Apache RocketMQ RCE漏洞原理与复现

    目录 漏洞原理 漏洞描述 影响范围 Apache RocketMQ学习 文档学习 代码审计 漏洞复现 docker环境搭建 exp代码 总结 参考 漏洞原理 漏洞描述 For RocketMQ versions 5 1 0 and belo
  • Flink RocketMQ Connector实现

    Flink内置了很多Connector 可以满足大部分场景 但是还是有一些场景无法满足 比如RocketMQ 需要消费RocketMQ的消息 需要自定时Source 一 自定义FlinkRocketMQConsumer 参考FlinkKaf
  • RocketMQ第五篇 RocketMQ API基本使用

    目录 生产者Product 消费者Consumer 前面已经学习了Rocket的基本知识 以及搭建MQ单机版和集群环境 下面开始进行实际开发 根据前面下载的RocketMQ源码 开展讲解RocketMQ 的基本使用 生产者Product 在
  • RocketMq存储设计——Index file

    RocketMq存储设计 Index file index file设计 rocket mq存储设计
  • rocketmq客户端配置

    1 客户端配置 相对于RocketMQ的Broker集群 生产者和消费者都是客户端 2 客户端寻址方式 RocketMQ可以令客户端找到Name Server 然后通过Name Server再找到Broker 如下所示有多种配置方式 优先级
  • 32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

    1 消费者 红包系统 丢失消息的问题 前面两章中 阐述了如何确保订单系统发送出去的消息一定会到达MQ中 而且也能确保了如果消息到达了MQ如何确保一定不会丢失 在整个消息的生产消费中 就剩下消费者这一端的问题了 红包系统 消费者 拿到消息后
  • RocketMQ 简介

    本文根据阿里云 RocketMQ产品文档整理 地址 https help aliyun com document detail 29532 html userCode qtldtin2 简介 RocketMQ是由阿里捐赠给Apache的一款
  • RocketMQ经典高频面试题大全(附答案)

    编程界的小学生 0 彩蛋 1 说说你们公司线上生产环境用的是什么消息中间件 2 多个mq如何选型 3 为什么要使用MQ 4 RocketMQ由哪些角色组成 每个角色作用和特点是什么 5 RocketMQ中的Topic和JMS的queue有什
  • spring整合RocketMQ

    1 看官方javademo https www apache org dyn closer cgi path rocketmq 4 2 0 rocketmq all 4 2 0 source release zip 下载下来 spring
  • RocketMQ 消息过滤

    1 简介 RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件 是在Consumer端订阅消息时 再做消息过滤的 RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实 现的
  • RocketMQ源码(26)—DefaultMQPushConsumer事务消息源码【一万字】

    事务消息是RocketMQ的一大特性 其被用来实现分布式事务 关于RocketMQ的事务消息的相关原理的介绍见这篇博客 RocketMQ的分布式事务机制 事务消息 关于事务消息的基本案例看这里 消息事务样例 本文主要介绍RocketMQ的事
  • RocketMQ报No route info of this topic

    最近某天突然收到报警邮件 线上某个应用发送MQ消息报错 完整异常栈如下 2018 04 08 18 17 44 126 DubboServerHandler 10 141 6 116 20968 thread 172 ERROR com x
  • 腾讯技术工程总结-主流消息队列你了解哪些?

    文章参考 腾讯技术工程 关于消息队列的知识总结 主流消息队列你了解哪些 消息队列的发展历程 2003 年至今有很多优秀的消息队列诞生 如 kafka 阿里自研的 rocketmq 以及后起之秀 pulsar 消息队列在刚出现所需要解决的问题

随机推荐