Kafka消费者默认组ID

2024-03-27

我正在使用 Apache Kafka 及其 Java 客户端,我发现消息在属于同一组的不同 Kafka Consumer 之间进行负载平衡(即共享相同的组 id)。

在我的应用程序中,我需要所有消费者阅读所有消息。

所以我有几个问题:

  • 如果我没有在 Consumer Properties 中设置任何组 id,那么 Kafka Consumer 会被赋予什么组 id?

  • 是否有一个默认值?

  • 客户端每次都会创建一个随机值吗?

  • 我是否需要为每个消费者创建不同的 id 以确保每个消费者都能收到所有消息?

编辑: 谢谢您的回答。

你是对的:如果没有设置消费者组id,Kafka应该抱怨。

但是,我发现如果组 id 为 null,Java 客户端会将其设置为空字符串“”以避免出现问题。 显然这就是我正在寻找的默认值。

令我所有的消费者感到惊讶的是,即使我没有设置他们的 groupId(因此它们都带有 groupId == ""),似乎也会收到生产者写入的所有消息。

我仍然无法解释这一点:有什么建议吗?


如果我没有在 Consumer Properties 中设置任何组 id,那么 Kafka Consumer 会被赋予什么组 id?

kafka消费者不会有任何消费者组。相反,您会收到此错误:The configured groupId is invalid

是否有一个默认值?

是的,您可以看到consumer.propertieskafka的文件供参考。默认的消费者组id是:group.id=test-consumer-group

客户端每次都会创建一个随机值吗?

不,对于启动 Kafka 0.9.0.x 消费者的 Java 客户端来说,groupId 似乎是必需的。你可以参考这个JIRA:https://issues.apache.org/jira/browse/KAFKA-2648 https://issues.apache.org/jira/browse/KAFKA-2648

我是否需要为每个消费者创建不同的 id 以确保每个消费者都能收到所有消息?

是的,如果所有消费者都使用相同的组 ID,则主题中的消息将在这些消费者之间分发。换句话说,每个消费者将获得消息的不重叠子集。在同一组中拥有更多消费者会增加并行度和消费的整体吞吐量。另一方面,如果每个消费者都在自己的组中,则每个消费者将获得所有消息的完整副本。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka消费者默认组ID 的相关文章

  • 增加 Java 中主题的分区数量

    我正在使用名称 卡夫卡 2 12版本 2 3 0 根据我想更改的流量 负载最大分区某个主题的编号 Kafka启动后是否可以进行这种更改 并且可以通过代码完成吗 是的 您可以通过代码增加分区 使用AdminClient createParti
  • 卡夫卡消费者陷入(重新)加入组

    如果 kafka 版本 0 10 消费者尝试重新加入消费者组 其默认行为是什么 我正在将单个消费者用于消费者组 但似乎它在重新加入时受到了打击 每 10 分钟后 它会在消费者日志中打印以下行 2016 08 11 13 54 53 803
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • python 脚本在 docker 内运行时无法导入 kafka 库 [重复]

    这个问题在这里已经有答案了 我有以下 python 脚本 可以从 twitter 中提取推文并将其发送到 kafka 主题 该脚本运行完美 但是当我尝试在 docker 容器内运行它时 它无法导入 kafka 库 它说 语法错误 语法无效
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • Kafka Connect 进入重新平衡循环

    我刚刚部署了 Kafka Connect 我只使用连接源 MQTT 应用程序位于两个实例的集群上 2 个容器上 机器 现在它似乎进入了一种重新平衡循环 我一开始有一点数据 但没有新数据出现 这就是我在日志中得到的内容 2017 08 11
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • Kafka 中的“__consumer_offsets”主题是什么

    当我运行此命令时 我得到 2 个主题 我知道我创建了测试主题 但我看到了一个名为 consumer offsets 的附加主题 从名称上看 它与消费者抵消有关 但它是如何使用的呢 bin kafka topics sh list zooke
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria

随机推荐

  • 使用 jQuery 设置列宽

    我有带有格式的 html 表 div table tbody tr td Set Width 10 Px td td Set Width 20 Px td tr tr Same for all tr tr tbody table div 即
  • 为什么 IDEA 显示 JavaDoc 错误?

    param string throws ApplicationException IDEA 突出显示 字符串 并告诉 未找到文档 为什么会这样 为以下内容编写一些文档string参数或调整声明有 Javadoc 问题检查设置 以免报告此类问
  • 应用程序要求首先将程序集 microsoft.reportviewer.processingObjectModel 版本 11.0.0.0 安装在全局程序集缓存中

    我有一个在 Visual Studio 2012 中创建的小型 Windows 窗体应用程序 它使用 ReportViewer 版本 11 0 0 0 应用目标框架为 NET 4 0 部署方式为ClickOnce 在我的电脑上可以安装 但在
  • Node.JS Schema.pre('save) 不更改数据

    我正在制作用户授权系统 并希望在将密码保存到数据库之前对其进行哈希处理 为了达到这个目的 我使用 bcrypt nodejs 上面标题中的问题 var mongoose require mongoose var bcrypt require
  • 如何正确处理分块编码请求?

    我有两个网站 一个使用 PHP 的 Lighttpd 第二个使用 Apache 这两个网站都不能正确处理分块传输编码 我从我的手机 J2ME 发送此请求 并且无法将此传输类型更改为任何其他类型 所以我唯一的方法是以其他方式处理分块传输编码请
  • Android Studio 2.2 预览版在实现数据绑定时抛出错误

    我已将 android studio 更新到 2 2 预览版 1 并按指定应用了 google 和 firebase 的指定依赖项 但仍然出现以下错误 EmptyThrowable Wrong dependency type class c
  • XMLHttpRequest setRequestHeader() --> 有没有办法设置标头值,而不是附加到它?

    from http www w3 org TR XMLHttpRequest the setrequestheader method http www w3 org TR XMLHttpRequest the setrequestheade
  • 验证正整数

    我只想允许数字字段使用正整数 包括零 如何使用 JSR 303 定义此验证 I tried Min value 0 message msg1 但它允许浮点值 如 1 2 Digits fraction 0 integer 10 messag
  • 线程拥有堆栈和进程拥有堆栈的策略是什么?

    线程拥有堆栈和进程拥有堆栈的策略是什么 如果我们有 10 个进程 那么我们有多少个堆栈 10 个 如果一个进程下有 10 个线程 那么我们有多少个堆栈 1 所有线程共享同一个堆栈 Thanks 如果你考虑一下堆栈是什么 那么共享堆栈是没有意
  • Wordapp 未在线程或并行进程中关闭

    下面的代码通常可以工作 并且在将 docx doc 保存到 pdf 后打开和关闭 word 但是当在线程或并行 for 循环中使用以下代码时 它不会 有任何想法吗 我已经提供了下面的所有代码 这是在函数中使用时工作正常的代码 wordApp
  • 如何正确对整数数组进行排序

    尝试从我知道仅包含整数的数组中获取最高和最低值似乎比我想象的更难 var numArray 140000 104 99 numArray numArray sort console log numArray 我希望这能显示99 104 14
  • Eclipse 在 pom.xml 文件中显示错误:cvc-datatype-valid.1.2.1: '${MYVAR}' 不是 'boolean' 的有效值

    我有一个 Maven 项目 可以在命令行上正常构建 我想在 Eclipse Luna 4 4 1 中编辑项目文件 但是当我加载项目时 它在我的 pom xml 文件中报告以下错误 cvc datatype valid 1 2 1 MYVAR
  • 根据值将逗号分隔的数字列拆分为多列

    我有一个专栏f在我的数据框中 我想根据该列中的值扩展到多个列 例如 df lt structure list f c NA 18 17 10 12 8 17 11 6 18 12 12 NA 17 11 12 Names f row nam
  • 使用特定的 url 地址从 java 代码关闭浏览器

    1 我想使用我的java代码中的url地址关闭特定的浏览器选项卡 因为它是一个客户端服务器应用程序 我想使用客户端应用程序中的 url 地址关闭浏览器选项卡 服务器端将有一个 jar 它将与客户端请求进行通信 并从客户端获取 url 并根据
  • JSON.NET序列化没有属性名称的字典[重复]

    这个问题在这里已经有答案了 大家 我有字典属性名称的 json 序列化问题 这是我的代码 public class MyClass public string A get set public string B get set public
  • 用于调试的 YII 日志记录

    在很多情况下 Xdebug不适合调试 因为它涉及点击运行到特定的代码行 我想使用类似的东西cakePHP调试功能 供开发人员将类的特定属性的值输出到浏览器 我在用Yii framework这是我的配置yii log in the main
  • 如何实例化对象的静态向量?

    我有一个 A 类 它有一个静态对象向量 对象属于 B 类 class A public static void InstantiateVector private static vector b vector of B 在函数 Instan
  • 修改现有的 Android ROM 以控制用户操作

    我正在为客户创建一个 Android 应用程序 该应用程序将预安装并与手机一起分发 现在客户要求我锁定 ROM 以防止未来的用户使用除此应用程序之外的任何其他应用程序 IE 没有浏览 没有电子邮件 没有任何可能产生任何费用的东西等 现在 经
  • 在 CMD 批处理脚本中调用标签时如何利用超过 9 个参数?

    我想知道如何在调用标签时在批处理脚本中调用超过 9 个参数 例如 下面显示我分配了 12 个参数 并尝试回显所有这些参数 CALL LABEL one two three four five six seven eight nine ten
  • Kafka消费者默认组ID

    我正在使用 Apache Kafka 及其 Java 客户端 我发现消息在属于同一组的不同 Kafka Consumer 之间进行负载平衡 即共享相同的组 id 在我的应用程序中 我需要所有消费者阅读所有消息 所以我有几个问题 如果我没有在