Kafka 的 session.timeout.ms 和 max.poll.interval.ms 之间的区别

2024-01-08

AFAIK,max.poll.interval.ms 是在 Kafka 0.10.1 中引入的。然而,目前还不清楚什么时候我们可以同时使用 session.timeout.ms 和 max.poll.interval.ms 考虑这样的用例:心跳线程没有响应,但我的处理线程由于设置了更高的值,因此仍在处理记录。但是,当心跳线程在跨越 session.timeout.ms 后关闭时,到底会发生什么。因为我在 POC 中观察到消费者重新平衡只有达到 max.poll.interval.ms 才会发生。
所以对我来说 session.timeout.ms 似乎是多余的。
相似的question https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0/39759329#39759329已发布,但没有回答这个问题。


session.timeout.ms用于通过心跳机制检测消费者故障。消费者心跳线程必须先向broker发送心跳session.timeout.ms时间到期。否则消费者被 Kafka 视为死亡并触发重新平衡。

心跳.间隔.ms:心跳与心跳之间的预期时间 使用 Kafka 的组管理工具时的消费者协调员。 心跳用于确保消费者的会话保持活动状态 并在新消费者加入或离开时促进重新平衡 团体。

会话超时时间:用于检测客户端故障的超时时间 使用 Kafka 的组管理工具。客户端定期发送 心跳向经纪人表明其活跃度。如果没有心跳 经纪人在本次会话到期之前收到 超时,然后代理将从组中删除该客户端,并且 启动重新平衡。

轮询是检查消费者健康状况的另一种机制。消费者应该调用 poll() 方法而不会过期max.poll.interval.ms。如果这个时间到期(通常长时间运行的进程会导致这个问题),消费者再次被视为死亡并触发重新平衡。

最大轮询间隔毫秒:poll() 调用之间的最大延迟 使用消费者组管理时。这设置了上限 消费者在获取更多数据之前可以空闲的时间 记录。如果在此超时到期之前未调用 poll(), 那么消费者被认为失败并且该组将在 为了将分区重新分配给另一个成员。

其他重要的一点是(从版本 0.10.1.0 开始):

rebalance.timeout = max.poll.interval.ms

由于我们为客户端提供了 max.poll.interval.ms 来处理 批量记录,这也是消费者可以使用的最长时间 在最坏的情况下预计会重新加入该团体。我们因此 建议将Java客户端中的重新平衡超时设置为相同 使用 max.poll.interval.ms 配置的值。当重新平衡开始时, 后台线程将继续发送心跳。消费者 在处理完成并且用户之前不会重新加入组 调用 poll()。从协调者的角度来看,消费者将 除非 1) 会话超时,否则不会从组中删除 未收到心跳就过期,或者 2) 重新平衡超时 过期。

所以在你的情况下,如果session.timeout.ms消费者没有心跳就过期,然后在该消费者组中启动重新平衡。重新平衡启动后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在此时发送 joinGroupRequest),直到重新平衡超时到期,该超时等于max.poll.interval.ms.

在重新平衡期间,您仍然可以处理已经拥有但无法提交和获取的消息提交失败异常与此消息:

提交无法完成,因为该组已经重新平衡并且 将分区分配给另一个成员。这意味着时间 对 poll() 的后续调用之间的时间比配置的要长 max.poll.interval.ms,这通常意味着轮询循环是 花费太多时间处理消息。您可以解决这个问题 通过增加会话超时或减少最大大小 poll() 中使用 max.poll.records 返回的批次。

欲了解更多信息,您可以查看this https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 的 session.timeout.ms 和 max.poll.interval.ms 之间的区别 的相关文章

  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或

随机推荐

  • 如何获取折线图中的所有json值

    我有很多 Json 值 我将使用它们创建一个折线图 但它在图表中只显示一个值 我是 javascript 的新手 有一个想法在图表中绘制所有值 请任何人给出这个问题的 jsfiddle 示例 HTML代码 div class chart S
  • 如何从两个数组列表中删除公共值

    我们如何从两个 ArrayList 中删除公共值 假设我有两个 Arraylist 如下所示 ArrayList1 1 2 3 4 ArrayList1 2 3 4 6 7 我希望得到的结果是 ArrayListFinal 1 6 7 我该
  • 运行 Rails 代码/初始化程序但不通过 Rake

    我的应用程序不断遇到重复出现的问题 基本上 我有一些代码希望它在第一次启动服务器时运行 以检查某些内容是否已定义 例如计划 数据库中的特定列 文件的存在等 然后采取相应的行动 但是 我绝对不希望在启动 Rake 任务 或执行 生成 等操作
  • backgroundTaskHost.exe 退出并显示代码 1 (0x1)

    我正在创建一个 Windows 应用商店应用程序 该应用程序具有用于后台任务的 Windows 运行时组件 该解决方案在 Visual Studio 中构建时没有任何问题 但当触发后台任务时 它总是失败并显示消息 程序 4204 backg
  • 如何使用 Dapper.Contrib 正确“单一化”表名?

    我有一个 Net Core 3 1 控制台应用程序 在SQL Server数据库中 我有单数名称的表 与我的POCO类相同 这方便匹配和维护 对于我想使用的插入 更新和删除操作Dapper Contrib图书馆 但是 当我运行 Insert
  • Qsort 在 C++ 中不适用于哪些类型?

    std sort通过使用交换元素std swap 它又使用复制构造函数和赋值运算符 保证您在交换值时获得正确的语义 qsort通过简单地交换元素的底层位来交换元素 忽略与要交换的类型相关的任何语义 虽然qsort尽管不了解您正在排序的类型的
  • 将变换应用于 UITextView - 防止内容调整大小

    当我将旋转变换应用于UITextView然后点击里面开始编辑 看起来内容尺寸自动变宽了 内容视图的新宽度是旋转视图的边界框的宽度 例如 给定一个宽度为 500 高度为 400 的文本框 并旋转 30 度 新的内容宽度将为 500 cos 3
  • Cassandra 大量 SSTable

    启动一些长时间运行的写入作业 使用 Spark Cassandra 连接器从 Apache Spark 作业批量插入 后 Cassandra v 2 1 为目标表创建了数千个 SSTable 超过 4500 个 次要压缩阈值设置为默认值 4
  • Spark SQL - 如何将 DataFrame 写入文本文件?

    我在用Spark SQL用于读取镶木地板和写入镶木地板文件 但有些情况下 我需要写DataFrame作为文本文件而不是 Json 或 Parquet 是否有任何支持的默认方法或者我必须将该 DataFrame 转换为RDD然后使用saveA
  • Windows 版 Git:致命:早期 EOF

    昨天我安装了一个新的 Git windows 服务器 2 6 4 它与 Mac git 客户端 git 协议 运行良好 今天我正在努力让第二个客户端 Windows 7 正常工作 在尝试使其工作的过程中 我已将 Windows 服务器和客户
  • 本地主机上的 Azure Functions 代理 404

    我有一个 Azure Function App 其 URL 处有一个函数http localhost 7072 api create room以及其他功能 这个特殊的函数是一个HTTPTrigger允许匿名访问并接受GET verb Htt
  • 从 LINQpad 迁移到正确的 Visual Studio 项目?

    我正在 LINQpad 中学习 LINQ to SQL 这很棒 但是背后发生了很多我不太理解的魔法 我正在使用可选的 IQ 驱动程序连接到 Oracle 数据库 该驱动程序可以在 LINQpad 内部下载 我的查询正在运行 现在我需要将其移
  • 发布实现接口的 F# 类时的反射/C# 键入错误

    我有一个用 C 编写的接口 但在用 F 实现它时 我注意到一些奇怪的地方 F 类必须先转换为接口 然后 C 才能使用它 转换后 WPF 无法读取其属性 绑定失败且 SNOOP 无法反映它 我可以用 C 代码包装该对象 一切正常 界面 pub
  • Deflate压缩块的结构

    我在理解 Deflate 算法时遇到困难 RFC 1951 https www rfc editor org rfc rfc1951 TL DR如何解析Deflate压缩块4be4 0200 我创建了一个包含字母和换行符的文件a n在里面
  • 我应该在 Objective C 中哪里初始化变量?

    在 Objective C 中 我应该覆盖 init 方法来初始化我的变量吗 如果变量是属性 我仍然可以通过通常的方式访问它们来设置其初始值吗 在 Objective C 中 我应该覆盖 init 方法来初始化我的变量吗 是的 具体来说 指
  • Rails 3“最后”方法从 ActiveRecord 输出返回错误结果

    我的控制器中有以下代码 items Item where user id gt 1 order updated at DESC limit 2 oldest item items last 出于某种原因 我猜测这与我最近升级到 Rails
  • 缓存控制的默认值是多少?

    我的问题是 有时浏览器会过度缓存某些资源 即使我已经修改了它们 但F5之后一切都很好 我整个下午都在研究这个案例 现在我完全理解了 Last Modified 或 Cache Control 的意义 我知道如何解决我的问题issue 只是
  • 如何使用 sed 删除从第一行开始到遇到模式 '[ERROR] -17-12-2015' 之前的行?

    我需要在遇到模式 ERROR 17 12 2015 之前删除从第一行到该行的行 目前我正在尝试以下命令 但不幸的是它没有找到模式本身 sed 1 ERROR 17 12 2015 d errLog 这里有什么问题吗 其次 上面的脚本还将删除
  • 如何在python中添加换行符?

    我刚刚用 python 编写了一个程序 但输出中的语句彼此太接近 那么如何在 python 中的两个语句之间添加换行符呢 您可以打印新行字符 print n numlines
  • Kafka 的 session.timeout.ms 和 max.poll.interval.ms 之间的区别

    AFAIK max poll interval ms 是在 Kafka 0 10 1 中引入的 然而 目前还不清楚什么时候我们可以同时使用 session timeout ms 和 max poll interval ms 考虑这样的用例