kafka ack=all 和 min-isr

2023-11-26

Summary

Kafka 的文档和代码注释表明,当生产者设置acks被设定为all那么只有在以下情况下才会将 ack 发送给生产者:所有同步副本都已赶上,但是代码(Partition.Scala, checkEnoughReplicasReachOffset)似乎建议尽快发送 ack最小同步副本已赶上.

Details

卡夫卡文档有这个:

acks=all 这意味着领导者将等待完整的同步副本集确认记录。source

另外,查看 Kafka 源代码 -partition.scala checkEnoughReplicasReachOffset()有以下评论(强调我的):

请注意,只有在 requiredAcks = -1 并且我们正在等待时才会调用此方法所有副本在我们确认生产请求之前,ISR 中要完全赶上与该生产请求相对应的(本地)领导者的偏移量。

最后,这个答案在 Stack Overflow 上(再次强调我的)

此外,最小同步副本设置还指定分区需要保持同步以保持可写入的最小副本数。当生产者指定 ack(-1 / 所有配置)时,它仍然会等待来自的 ack全部同步副本此时(与最小同步副本的设置无关)。

但是当我查看 Partition.Scala 中的代码时(注意minIsr < curInSyncReplicas.size):

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
  ...
  val minIsr = leaderReplica.log.get.config.minInSyncReplicas
  if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {          
    if (minIsr <= curInSyncReplicas.size)
      (true, Errors.NONE)

调用此函数的代码返回 ack:

if (error != Errors.NONE || hasEnough) {
  status.acksPending = false
  status.responseStatus.error = error
}

因此,一旦同步副本集大于最小同步副本,代码看起来就会返回一个确认。然而,文档和注释表明,只有在所有同步副本都赶上后才会发送确认。我缺少什么?至少上面的评论checkEnoughReplicasReachOffset看起来应该改变一下。


感谢 jira-dev 邮件列表上的 Ismael。

关键点是这一行:

if(leaderReplica.highWatermark.messageOffset >= requiredOffset) {

仅当 ISR 中的所有副本都具有该特定偏移量时,高水位线才会移动。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka ack=all 和 min-isr 的相关文章

  • Kafka结构化流KafkaSourceProvider无法实例化

    我正在开发一个流项目 其中有一个 ping 统计数据的 kafka 流 如下所示 64 bytes from vas fractalanalytics com 192 168 30 26 icmp seq 1 ttl 62 time 0 9
  • 如何在java程序中获取kafka消耗滞后

    我写了一个java程序来消费来自kafka的消息 我想监控消费延迟 如何通过java获取它 顺便说一句 我用
  • 使用kafka lib反序列化PRIMITIVE AVRO KEY

    我目前无能力反序列化 avro PRIMITIVE 密钥在 KSTREAM 应用程序中 使用 avro 模式编码的密钥 在模式注册表中注册 当我使用 kafka avro console consumer 时 我可以看到密钥已正确反序列化
  • KeeperErrorCode = /admin/preferred_replica_election 的 NoNode

    当我启动kafka时 zookeeper发生错误 INFO Got user level KeeperException when processing sessionid 0x156028651c00001 type delete cxi
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • Kafka 分区键无法正常工作

    我正在努力解决如何正确使用分区键机制的问题 我的逻辑是设置分区号为3 然后创建三个分区键为 0 1 2 然后使用分区键创建三个KeyedMessage 例如 KeyedMessage 主题 0 消息 KeyedMessage 主题 1 消息
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确

随机推荐

  • Brunch:分离供应商和应用程序 javascript

    我从我们的项目供应商和应用程序中制作了两个 javascript 包 我按照建议的方式执行此操作文档 如我的 brunch config js 中的这段代码所示 files javascripts joinTo js vendor js s
  • 如何在Android中像Wifi分析仪应用程序一样绘制图表?

    您好 我正在尝试开发一个现场测试应用程序 我必须检索相邻小区的信号强度等信息 所以我的问题是 如何显示具有不同相邻单元格的图表 X 轴和 Y 轴上的信号强度是实时的吗 一个例子here 我已经获得了 5 或 6 个相邻小区以及每个小区的信号
  • Mac OS X / iOS 中的正则表达式匹配表情符号

    Note 在不支持所包含表情符号的系统上 这个问题可能看起来很奇怪 这是一个后续问题如何从字符串中删除表情符号 我想构建一个正则表达式来匹配可以在 Mac OS X iOS 中输入的所有表情符号 明显的 Unicode 块涵盖了大部分表情符
  • 如何在 MVVM 中使用同一个 ViewModel 拥有多个视图?

    我对 WPF 和 MVVM 都很陌生 在尝试设置DataContext到两个单独视图中的 ViewModel 的同一实例 这是因为
  • 如何在 AngularJS 中关闭浏览器窗口

    我有一个登录表单作为单独的浏览器窗口弹出 一旦 API 验证用户已登录 我如何在 AngularJS 中关闭该登录浏览器窗口 Use window close in window服务 您可以像这样将结果广播到另一个控制器AngularJS
  • 是否可以捕获包含 Windows 7 DWM 缩略图的窗口?

    我开始相信你不能用 Windows API 做任何事 我有两个窗户 其中有一个 DWM 缩略图 我想要做的是 我希望能够将窗口屏幕的缩略图捕获到另一个窗口中 当我使用 bitblt 执行此操作时 除了缩略图之外的所有内容都会被复制 它只是位
  • 在android中画圆[关闭]

    很难说出这里问的是什么 这个问题模棱两可 含糊不清 不完整 过于宽泛或言辞激烈 无法以目前的形式合理回答 如需帮助澄清此问题以便重新打开 访问帮助中心 如何使用 Android SDK 在两点之间绘制圆 创建一个位图 然后在其画布上绘制 然
  • Postgresql base64 编码

    我需要将 db 值转换为 base64encode 我试过 select encode cast est name as text base64 from establishments 它显示错误 SQL select encode str
  • 在提交表单之前添加确认提醒

    我有一个表单允许用户从数据库中删除一些数据 我想要一些确认信息以防止意外删除 我想做以下事情 按下提交后 会弹出警告 您确定吗 如果用户点击 是 则运行脚本 如果用户点击 否 则不要提交脚本 如何才能做到这一点 我已经添加了 onSubmi
  • np.array() 和 np.asarray() 有什么区别?

    NumPy 和 NumPy 有什么区别np array and np asarray 我什么时候应该使用其中一种而不是另一种 它们似乎产生相同的输出 The 的定义asarray is def asarray a dtype None or
  • table-header-group 、 table-footer-group 属性在 Chrome 中不起作用

    这是我的代码 http furkan brove net syflm php 当我打印它时 它在 Chrome 中不起作用 我希望它在打印模式下将页眉和页脚放在每一页上 此外 在每个浏览器中 最后一个页脚位于内容的底部 但我希望它位于页面底
  • python中的完全单调插值

    我有一些数据 例如 我想拟合一条可微分的单调曲线 我试过PchipInterpolator类但在一个非常相似的图表上 结果是 这并不单调 如何将单调曲线拟合到这样的数据 以下是另一个类似图表的 y 值示例集 0 11091571190236
  • Xcode 6.3 两次构建所有 swift 文件

    我刚刚升级到 Xcode 6 3 并试图将编译时间减少到可管理的程度 我的项目中有大约 120 个 swift 文件 类 编译需要 2 3 分钟 我的项目还有两个测试目标 UnitTests and AutomatedTests Here
  • Django forms.DateInput 不应用 attrs 字段中给出的属性

    尝试通过 django 的 attrs 说明符应用时 占位符 类未设置表单 日期输入 表格是一个模型表单 并根据docs 采用与 TextInput 相同的参数 但多了一个可选参数 这是代码 widgets my date field fo
  • Javascript 中除法结果的四舍五入

    我正在 Javascript 中执行以下操作 0 0030 0 031 如何将结果四舍五入到任意位数 a 的最大数量是多少var将举行 现代浏览器应该支持一种称为toFixed 这是取自网络的例子 Example toFixed 2 whe
  • 裁剪和缩放 MTLTexture

    我可以创建一个新的吗MTLTexture尺寸w2 h2现有的MTLTexture region x1 y1 w1 h1 PS 我考虑过使用MTLTexture buffer makeTexture但偏移量需要是64字节 为什么 以下是您可以
  • 如何更改 joptionpane 的大小和字体?

    您可以更改 JOptionPane 文本的字体和大小吗 我尝试过 只有当我在该特定的 java 类上 运行文件 时它才有效 如果您启动整个项目 它不会更改字体 我只想更改特定的 JOptionPane 而不是全部 这是代码 UIManage
  • + 运算符如何用于合并委托?

    例如 delegate void SomeDelegate SomeDelegate a new SomeDelegate gt Console WriteLine A SomeDelegate b new SomeDelegate gt
  • Flexbox 溢出滚动条显示在主体而不是内部元素上

    问题 在使用带有溢出的 Flexbox 的全尺寸应用布局 100 宽度 100 高度 中 在 Firefox IE 或 Edge 中不会显示滚动条 它们在 Chrome 中确实显示正常 FF IE Edge 中不是在元素上设置垂直滚动条 而
  • kafka ack=all 和 min-isr

    Summary Kafka 的文档和代码注释表明 当生产者设置acks被设定为all那么只有在以下情况下才会将 ack 发送给生产者 所有同步副本都已赶上 但是代码 Partition Scala checkEnoughReplicasRe