Kafka Consumer Group Id 和消费者重新平衡问题

2024-04-01

我正在使用卡夫卡0.10.0和动物园管理员3.4.6在我的生产服务器中。我有 20 个主题,每个主题大约有 50 个分区。我总共有 100 个消费者,每个消费者订阅了不同的主题和分区。所有消费者都具有相同的 groupId。那么,如果针对特定主题添加或删除消费者,那么附加到不同主题的消费者也会经历重新平衡吗?

我的消费者代码是:

public static void main(String[] args) {
        String groupId = "prod"
        String topicRegex = args[0]
        String consumerTimeOut = "10000"
        int n_threads = 1
        if (args && args.size() > 1) {
            ConfigLoader.init(args[1])
        }
        else {
            ConfigLoader.init('development')
        }
        if(args && args.size() > 2 && args[2].isInteger()){
            n_threads = (args[2]).toInteger()
        }

        ExecutorService executor = Executors.newFixedThreadPool(n_threads)
        addShutdownHook(executor)
        String zooKeeper = ConfigLoader.conf.zookeeper.hostName
        List<Runnable> taskList = []
        for(int i = 0; i < n_threads; i++){
            KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topicRegex, consumerTimeOut)
            taskList.add(example)
        }
        taskList.each{ task ->
            executor.submit(task)
        }
        executor.shutdown()
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId, String consumerTimeOut) {

        Properties props = new Properties()
        props.put("zookeeper.connect", a_zookeeper)
        props.put("group.id", a_groupId)
        props.put("zookeeper.session.timeout.ms", "10000")
        props.put("rebalance.backoff.ms","10000")
        props.put("zookeeper.sync.time.ms","200")
        props.put("rebalance.max.retries","10")
        props.put("enable.auto.commit", "false")
        props.put("consumer.timeout.ms", consumerTimeOut)
        props.put("auto.offset.reset", "smallest")
        return new ConsumerConfig(props)

    }

public void run(String topicRegex) {
        String threadName = Thread.currentThread().getName()
        logger.info("{} [{}] main Starting", TAG, threadName)
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>()
        List<KafkaStream<byte[], byte[]>> streams = consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1)
        ConsumerConnector consumerConnector = consumer

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator()
            List<Object> batchTypeObjList = []
            String topic
            String topicObjectType
            String method
            String className
            String deserialzer
            Integer batchSize = 200
            while (true){
                boolean hasNext = false
                try {
                    hasNext = consumerIterator.hasNext()
                } catch (InterruptedException interruptedException) {
                    //if (exception instanceof InterruptedException) {
                    logger.error("{} [{}]Interrupted Exception: {}", TAG, threadName, interruptedException.getMessage())
                    throw interruptedException
                    //} else {
                } catch(ConsumerTimeoutException timeoutException){
                    logger.error("{} [{}] Timeout Exception: {}", TAG, threadName, timeoutException.getMessage())
                    topicListMap.each{ eachTopic, value ->
                        batchTypeObjList = topicListMap.get(eachTopic)
                        if(batchTypeObjList != null && !batchTypeObjList.isEmpty()) {
                            def dbObject = topicConfigMap.get(eachTopic)
                            logger.debug("{} [{}] Timeout Happened.. Indexing remaining objects in list for topic: {}", TAG, threadName, eachTopic)
                            className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                            method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                            int sleepTime = 0
                            if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
                                sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
                            executeMethod(className, method, batchTypeObjList)
                            batchTypeObjList.clear()
                            topicListMap.put(eachTopic,batchTypeObjList)
                            sleep(sleepTime)
                        }
                    }
                    consumer.commitOffsets()
                    continue
                } catch(Exception exception){
                    logger.error("{} [{}]Exception: {}", TAG, threadName, exception.getMessage())
                    throw exception
                }
                if(hasNext) {
                    def consumerObj = consumerIterator.next()
                    logger.debug("{} [{}] partition name: {}", TAG, threadName, consumerObj.partition())
                    topic = consumerObj.topic()
                    DBObject dbObject = topicConfigMap.get(topic)
                    logger.debug("{} [{}] topic name: {}", TAG, threadName, topic)
                    topicObjectType = dbObject.get(KafkaTopicConfigEntity.TOPIC_OBJECT_TYPE_KEY)
                    deserialzer = KafkaConfig.DEFAULT_DESERIALIZER
                    if(KafkaConfig.DESERIALIZER_MAP.containsKey(topicObjectType)){
                        deserialzer = KafkaConfig.DESERIALIZER_MAP.get(topicObjectType)
                    }
                    className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                    method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                    boolean isBatchJob = dbObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                    if(dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) != null)
                        batchSize = dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY)
                    else
                        batchSize = 1
                    Object queueObj = (Class.forName(deserialzer)).deserialize(consumerObj.message())
                    int sleepTime = 0
                    if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
                        sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
                    if(isBatchJob == true){
                        batchTypeObjList = topicListMap.get(topic)
                        batchTypeObjList.add(queueObj)
                        if(batchTypeObjList.size() == batchSize) {
                            executeMethod(className, method, batchTypeObjList)
                            batchTypeObjList.clear()
                            sleep(sleepTime)
                        }
                        topicListMap.put(topic,batchTypeObjList)
                    } else {
                        executeMethod(className, method, queueObj)
                        sleep(sleepTime)
                    }
                    consumer.commitOffsets()
                }
            }
            logger.debug("{} [{}] Shutting Down Process ", TAG, threadName)
        }
    }

任何帮助将不胜感激。


每当消费者离开或加入消费者组时,整个组都会经历重新平衡。由于该组跟踪其成员订阅的所有主题的所有分区,因此您的想法是正确的,这可能会导致未订阅相关主题的消费者重新平衡。

请参阅下面的一个小测试来说明这一点,我有一个具有两个主题 test1 (2 个分区)和 test2 (9 个分区)的代理,并且正在启动两个消费者,两个消费者都具有相同的消费者组,每个消费者仅订阅这两个主题之一主题。正如您所看到的,当consumer2加入该组时,consumer1会撤销并重新分配所有分区,因为整个组会重新平衡。

Subscribing consumer1 to topic test1
Starting thread for consumer1
Polling consumer1
consumer1 got 0 partitions revoked!
consumer1 got 2 partitions assigned!
Polling consumer1
Polling consumer1
Polling consumer1
Subscribing consumer2 to topic test2
Starting thread for consumer2
Polling consumer2
Polling consumer1
consumer2 got 0 partitions revoked!
Polling consumer1
Polling consumer1
consumer1 got 2 partitions revoked!
consumer2 got 9 partitions assigned!
consumer1 got 2 partitions assigned!
Polling consumer2
Polling consumer1
Polling consumer2
Polling consumer1
Polling consumer2
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Consumer Group Id 和消费者重新平衡问题 的相关文章

  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • Kafka 分区键无法正常工作

    我正在努力解决如何正确使用分区键机制的问题 我的逻辑是设置分区号为3 然后创建三个分区键为 0 1 2 然后使用分区键创建三个KeyedMessage 例如 KeyedMessage 主题 0 消息 KeyedMessage 主题 1 消息
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • Kafka Connect Confluence S3 Sink 连接器:找不到类 io.confluence.connect.avro.AvroConverter

    使用此 Kafka Connect 连接器 https www confluence io hub confluenceinc kafka connect s3 https www confluent io hub confluentinc
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • Confluence 平台与 apache kafka [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是 kafka 的新手 对 Confluence 平台很好奇 看来Confluence平台上的用户故事并不多 Confluence平台和Apa
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache

随机推荐

  • 何时在 Angular + Java 项目中使用 DTO 和 Matpstruct

    好吧 我有一个大项目 我想把它做好 但我什至不知道规范是什么 Problem 我有几个具有关系的实体 我需要将它们一起显示在页面上 在视图中显示前 3 个事件 凡是Event有关系到事件实例 a 取消政策 并且对多个Pricing 截止日期
  • 获取用户在当前之前浏览过的页面

    代替 login php ref http mysite com lastpage 我可以通过其他方式获得之前的页面吗 尝试了 HTTP REFERER 但无法让它工作 Notice Undefined index HTTP REFERER
  • 渲染 svg 文件并使用 Express 提供服务

    我想知道如何使用 Express 渲染和提供 svg 文件 现在 我可以将 svg 作为原始 XML 文件提供 这是我正在做的事情 route router get status function req res next res setH
  • Rails 中的共享 JS(咖啡)

    如果我想在不同文件之间共享一些 JavaScript 函数应用程序 资产 javascript组织目录结构的最佳方法是什么 假设我有共享 js 咖啡 sharedFunction gt Hello 现在 我如何在其他地方使用它 就像这里一样
  • Solr 复制和 Solr 云有什么区别?

    我支持 Rails 项目 其中包含 Rails 应用程序和 Solr 的附加实例 我的环境 rails 3 2 1 ruby 2 1 2 sunspot 2 1 0 Solr 4 1 6 Problem 云提供商不稳定 我不能使用其他云提供
  • Pandas groupby - 计算相对点的距离

    假设我有一些看起来像这样的东西 df pd DataFrame Event A A A A A B B B B B Number 1 2 3 4 5 6 7 8 9 10 Ref False False False False True F
  • Nexus One - Android 2.1 版本,2.1 的 SDK 在哪里?

    搭载 2 1 操作系统的 Nexus Android 手机今天上市 我的朋友 刚刚订购了两个 隔夜运送 我想这意味着明天或后天就会在他手中 人们在他们的生活中拥有 2 1 版本 这怎么可能接受呢 在开发人员接触 SDK 之前就动手 我已经有
  • Selenium:查找基本 URL

    我在不同的机器上使用 Selenium 来自动测试 MVC Web 应用程序 我的问题是我无法获取每台机器的基本网址 我可以使用以下代码获取当前网址 IWebDriver driver new FirefoxDriver string cu
  • 80 端口上的 heroku + nginx

    我正在尝试在 heroku 免费环境中启动 nginx 服务器 我准备了任何操作方法和教程 但我无法运行它 首先 我想在端口 80 上启动 nginx 作为默认 Web 服务器 然后我想将 nginx 配置为 underline expre
  • 3个表之间的内连接

    我在数据库中有这些表 country id country 1 USA 2 Brazil 和段表 id country 1 USA 2 Brazil 我有第三张表 Id segment id country id 其中segment id是
  • GSON 转换为 LinkedHashMap 而不是我的对象

    我有这段代码 public abstract class Repository
  • SQL:查找表中缺失的层次结构文件夹(路径)

    我有一个包含文件夹路径的表 我需要找到层次结构中这些文件夹之间的所有 间隙 我的意思是 如果表包含这 3 个文件夹 A A B C A B C D E F G 我需要在层次结构中找到以下丢失的文件夹 A B A B C D A B C D
  • Sonar 5.1 问题列表 - 如何按问题类型分组

    我们如何识别当前代码库中项目中最常见的问题类型 我们最近从 Sonar 4 5 升级到 5 1 在4 5中 我们曾经查看特定项目中的问题列表 并且问题按问题类型分组 例如 在一个项目中 规则 使用记录器记录此异常 可能是最常见的关键规则 有
  • 如何检查mysql中的二进制字符串是否为UTF-8?

    我找到了一个 Perl 正则表达式 可以检查字符串是否为 UTF 8 正则表达式来自w3c site http www w3 org International questions qa forms utf 8 en php field m
  • matplotlib 中的等高线图显示不正确的线型

    我正在使用轮廓图在 matplotlib 中绘制一个具有正值和负值的二维矩阵 它应该显示正值的实线和负值的虚线 loc matplotlib ticker MaxNLocator 20 Z psi lvls loc tick values
  • 在SSIS中使用执行进程任务和WinSCP进行SFTP传输

    我有一个User file txt文件放置在 WinSCP 根文件夹 USERDATA 中 我正在尝试将其下载到给定位置C User Local Executable C Program Files x86 WinSCP WinSCP ex
  • React 组件安装两次

    在我的 React Redux ReactRouterV4 应用程序的一小部分中 我有以下组件层次结构 Exhibit Parent ExhibitOne ExhibitTwo ExhibitThree 在 Exhibit 的子级中 还可以
  • 使用远程执行配置程序时,当 instance_count 大于 2 时,Terraform 会卡住

    我正在尝试使用 null resource 通过 Terraform 的远程执行配置程序来配置多个 Windows EC2 实例 terraform v Terraform v0 12 6 provider aws v2 23 0 prov
  • 如何使用 WriteEndElement 检查元素的名称

    我正在写 xmlXmlWriter http msdn microsoft com en gb library system xml xmlwriter aspx 我的代码有很多这样的部分 xml WriteStartElement pay
  • Kafka Consumer Group Id 和消费者重新平衡问题

    我正在使用卡夫卡0 10 0和动物园管理员3 4 6在我的生产服务器中 我有 20 个主题 每个主题大约有 50 个分区 我总共有 100 个消费者 每个消费者订阅了不同的主题和分区 所有消费者都具有相同的 groupId 那么 如果针对特