session.timeout.ms
用于通过心跳机制检测消费者故障。消费者心跳线程必须先向broker发送心跳session.timeout.ms
时间到期。否则消费者被 Kafka 视为死亡并触发重新平衡。
心跳.间隔.ms:心跳与心跳之间的预期时间
使用 Kafka 的组管理工具时的消费者协调员。
心跳用于确保消费者的会话保持活动状态
并在新消费者加入或离开时促进重新平衡
团体。
会话超时时间:用于检测客户端故障的超时时间
使用 Kafka 的组管理工具。客户端定期发送
心跳向经纪人表明其活跃度。如果没有心跳
经纪人在本次会话到期之前收到
超时,然后代理将从组中删除该客户端,并且
启动重新平衡。
轮询是检查消费者健康状况的另一种机制。消费者应该调用 poll() 方法而不会过期max.poll.interval.ms
。如果这个时间到期(通常长时间运行的进程会导致这个问题),消费者再次被视为死亡并触发重新平衡。
最大轮询间隔毫秒:poll() 调用之间的最大延迟
使用消费者组管理时。这设置了上限
消费者在获取更多数据之前可以空闲的时间
记录。如果在此超时到期之前未调用 poll(),
那么消费者被认为失败并且该组将在
为了将分区重新分配给另一个成员。
其他重要的一点是(从版本 0.10.1.0 开始):
rebalance.timeout = max.poll.interval.ms
由于我们为客户端提供了 max.poll.interval.ms 来处理
批量记录,这也是消费者可以使用的最长时间
在最坏的情况下预计会重新加入该团体。我们因此
建议将Java客户端中的重新平衡超时设置为相同
使用 max.poll.interval.ms 配置的值。当重新平衡开始时,
后台线程将继续发送心跳。消费者
在处理完成并且用户之前不会重新加入组
调用 poll()。从协调者的角度来看,消费者将
除非 1) 会话超时,否则不会从组中删除
未收到心跳就过期,或者 2) 重新平衡超时
过期。
所以在你的情况下,如果session.timeout.ms
消费者没有心跳就过期,然后在该消费者组中启动重新平衡。重新平衡启动后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在此时发送 joinGroupRequest),直到重新平衡超时到期,该超时等于max.poll.interval.ms
.
在重新平衡期间,您仍然可以处理已经拥有但无法提交和获取的消息提交失败异常与此消息:
提交无法完成,因为该组已经重新平衡并且
将分区分配给另一个成员。这意味着时间
对 poll() 的后续调用之间的时间比配置的要长
max.poll.interval.ms,这通常意味着轮询循环是
花费太多时间处理消息。您可以解决这个问题
通过增加会话超时或减少最大大小
poll() 中使用 max.poll.records 返回的批次。
欲了解更多信息,您可以查看this https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread.