Apache Kafka 消费者组的偏移量如何过期?

2024-05-07

当我注意到一些奇怪的行为时,我正在对一个旧主题进行一些测试。阅读 Kafka 的日志时,我注意到这条“删除了 8 个过期的偏移量”消息:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

其实我有2个问题:

  1. 对于消费者组来说,这个偏移过期是如何运作的?

  2. 这个过期的偏移量可以解释这种行为吗?我的消费者在有任何东西时不会轮询任何东西auto.offset.reset = latest,但它从最后提交的偏移量中进行轮询auto.offset.reset = earliest ?


Update

从 Apache Kafka 2.1 开始,只要消费者组处于活动状态,偏移量就不会被删除,与消费者是否提交偏移量无关,即offset.retention.minutes时钟仅在组变空时才开始计时(在旧版本中,时钟在提交发生时直接开始计时)。

Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

原答案

默认情况下,Kafka 在可配置的时间段后删除已提交的偏移量。参见参数offsets.retention.minutes。即,如果消费者组在这段时间内处于非活动状态(即不提交任何偏移量),则偏移量将被删除。因此,即使消费者正在运行,如果它没有提交某些分区的偏移量,这些偏移量也会受到offset.retention.minutes.

如果您启动消费者,会发生以下情况:

  1. look for a (valid) committed offset (for the consumer group)
    1. 如果找到有效的偏移量,则从那里恢复
    2. 如果没有找到有效的偏移量,则根据auto.offset.reset范围

因此,如果您的偏移量被删除并且auto.offset.reset = latest,在新数据添加到主题之前,您的消费者不会轮询任何内容。如果auto.offset.reset = earliest它应该占据整个主题。

请参阅此 JIRA 以获取有关此问题的讨论https://issues.apache.org/jira/browse/KAFKA-3806 https://issues.apache.org/jira/browse/KAFKA-3806 and https://issues.apache.org/jira/browse/KAFKA-4682 https://issues.apache.org/jira/browse/KAFKA-4682

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Kafka 消费者组的偏移量如何过期? 的相关文章

随机推荐

  • 如何使用户输入与变量相关?

    我不知道如何准确地表达这个问题 但这就是我想要实现的目标 我正在使用堆栈实现河内塔插图 这是里面的main 功能 System out println Type the source pole number and the destinat
  • 在Python中为什么ifrank:比ifrank!= 0更快:

    当我改变的时候 for i in range 0 100 rank ranks i if rank 0 pass to for i in range 0 100 rank ranks i if rank pass 我发现第二个代码效率更高
  • 使用 qTip2 确认工具提示对话框?

    因此 当用户单击删除按钮时 我尝试创建一个小型确认对话框 内联 工具提示 我想象它看起来有点像这样 但带有小文本和 确定 和 取消 按钮 但我不是来问如何设计它的 我更喜欢使用 qTip2 作为该工作的插件 但如果你有更好的选择 我也会选择
  • SQL Server 2008 中 CLR 中的线程

    我有一个CLR运行在下面的进程SQL Server2008 它构建了多个表数据的缓存以保存在静态类中以供其他调用稍后使用 我的问题是 我可以通过生成线程来加载缓存中的每个数据集 表来改进加载此缓存的过程吗 我过去一直回避这一点 因为各种帖子
  • AutoCompleteTextView sqlite填充异常

    我的第一篇文章只有不到一半的文字 因此是第二篇 完整的 文章 我正在开发一个测试应用程序 使用 sqlite 填充两个 AutoCompleteTextView 我正在使用汽车制造商和模型进行测试 自动完成的 使自动完成 模型自动完成 ma
  • Android 中无网络、断线的情况如何处理?

    我有一个应用程序需要连接到 Internet 才能执行某些操作 但当没有可用的 Internet 时 它会崩溃 我读到 如果没有互联网 我需要使用 try catch 括号 我尝试使用它 正如您在 AsyncTask 中看到的那样 但它不起
  • 使用 Task.Run() 写入控制台失败

    我的一位同事发现我们的代码存在问题 花了一段时间才弄清楚究竟发生了什么 但可以通过这个简单的示例来最好地证明这一点 Fails class Program static void Main string args Task Run gt C
  • Android 2.x 天城文 unicode 问题

    我正在尝试使用以下代码支持 android 2 x 的梵文字体 即使 android 2 x 无法渲染梵文字体 除了 raswa 和 dirga 存在一些问题之外 代码工作正常 是否有可能在 android 2 x 中获得正确的梵文表示形式
  • 无法将 Visual Studio 项目中的多个文件提交到 subversion

    几周以来 我在使用 Subversion 时遇到了一些问题 当我尝试从 Visual Studio 2017 项目提交文件时 有些文件无法提交到我的 Visual SVN 服务器 准确地说 项目文件夹中的所有文件 如 cs config c
  • 使用 libx264 为 Raspberry pi 编译 Xuggler 时的问题 #2

    我正在尝试编译Xuggler http www xuggle com xuggler 对于 Raspberry Pi 在 Debian 操作系统上运行 又名 Raspbian 我遵循了可用的 基本构建说明 here http www xug
  • 是否有一个“AcceptsOneWidget”也“ProvidesResze”(除了“ScrollPanel”)?

    我有一个复合材料可以扩展ResizeComposite并有一个DockLayoutPanel作为它的根 我可以直接贴进去RootLayoutPanel它之所以有效是因为DockLayoutPanel ProvidesResize 但是 我想
  • 如何监控“即时发生”生成的 Google Alert RSS feed?

    我有一个 Google 快讯 我将其设置为以 RSS 源形式发送当它发生的时候 But 轮询 RSS feed 是获取 RSS feed 的唯一方法 or 当 Feed 是从 Google 发布时 有没有办法收到 Google 警报的通知
  • Flex 与 ActionScript

    我正在尝试构建 实现一个 Flash 视频播放器来播放视频 我研究了 Flex 并仅使用 VideoElement 构建了一个基本应用程序 它在没有静态链接库的情况下编译为 41k 在链接库时编译为 300k 我生成了报告 但仍然不确定为什
  • 如何通过 HttpContext 访问 Blazor Server .Net 6 中的 cookie?

    不幸的是 有关在 Blazor Server 中访问 c ookie 的教程和之前的 StackOverflow 答案似乎在新的 Net 版本中变得无效 例如 我无法获得以下任一答案 根据 net 6 中没有的评论来判断 如何在服务器端 B
  • cuda中内核的并行执行

    可以说我有三个全局数组 它们已使用 cudaMemcpy 复制到 GPU 中 但 c 中的这些全局数组尚未使用 cudaHostAlloc 分配 以便分配页面锁定的内存 而不是简单的全局分配 int a 100 b 100 c 100 cu
  • 在 Linux 2.6.21 (glibc 2.3.5) 上进行 ARP 和反向 ARP

    我需要在任意 IP 网络上存储对第三方设备的持久引用 其中设备的 IP 地址可能是静态的或由 DHCP 随机分配 我不控制网络上的设备 也不能依赖 DNS 和其他现有的或与设备一起使用的临时网络协议 所以我被指示使用硬件地址和 ARP 进行
  • Haskell 中函数和函子有什么区别?只有定义吗?

    在 Haskell 中 当编写函数时 这意味着我们将某个东西 输入 映射到另一个东西 输出 我尝试 LYAH 来理解 Functor 的定义 看起来和普通 Functor 一样 函数被称为函子有什么限制吗 Functor 是否允许有 I O
  • 将组合字符串和数字输入的元胞数组写入文本文件

    考虑以下 DateTime 2007 01 01 00 00 2007 02 01 00 00 2007 03 01 00 00 Headers Datetime Data Dat 100 200 300 Data DateTime num
  • 使用 mvel 检查 List 中是否存在元素

    我随身带着一份清单 清单就像 List
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge