Summary
Kafka 的文档和代码注释表明,当生产者设置acks
被设定为all
那么只有在以下情况下才会将 ack 发送给生产者:所有同步副本都已赶上,但是代码(Partition.Scala
, checkEnoughReplicasReachOffset
)似乎建议尽快发送 ack最小同步副本已赶上.
Details
卡夫卡文档有这个:
acks=all 这意味着领导者将等待完整的同步副本集确认记录。source
另外,查看 Kafka 源代码 -partition.scala
checkEnoughReplicasReachOffset()
有以下评论(强调我的):
请注意,只有在 requiredAcks = -1 并且我们正在等待时才会调用此方法所有副本在我们确认生产请求之前,ISR 中要完全赶上与该生产请求相对应的(本地)领导者的偏移量。
最后,这个答案在 Stack Overflow 上(再次强调我的)
此外,最小同步副本设置还指定分区需要保持同步以保持可写入的最小副本数。当生产者指定 ack(-1 / 所有配置)时,它仍然会等待来自的 ack全部同步副本此时(与最小同步副本的设置无关)。
但是当我查看 Partition.Scala 中的代码时(注意minIsr < curInSyncReplicas.size
):
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
...
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
调用此函数的代码返回 ack:
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
因此,一旦同步副本集大于最小同步副本,代码看起来就会返回一个确认。然而,文档和注释表明,只有在所有同步副本都赶上后才会发送确认。我缺少什么?至少上面的评论checkEnoughReplicasReachOffset
看起来应该改变一下。