什么情况下endOffset>lastMsg.offset+1?

2024-01-31

Kafka 对于分区返回 endOffset 15,但可以使用的最后一条消息的偏移量为 13,而不是我期望的 14。我想知道为什么。

The 卡夫卡文档 https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) read

在默认的read_uncommissed隔离级别下,结束偏移量是高水位线(即最后成功复制的消息的偏移量加一)。对于read_commited消费者来说,结束偏移量是最后一个稳定偏移量(LSO),它是高水位线的最小值和任何打开事务的最小偏移量。

这是 kafkacat 的输出。我使用 kafkacat,因为它可以打印消息偏移量:

$ kafkacat -Ce -p0 -tTK -f'offset: %o key: %k\n'
offset: 0 key: 0108
offset: 1 key: 0253
offset: 4 key: 0278
offset: 5 key: 0198
offset: 8 key: 0278
offset: 9 key: 0210
offset: 10 key: 0253
offset: 11 key: 1058
offset: 12 key: 0141
offset: 13 key: 1141
% Reached end of topic TK [0] at offset 15: exiting

同样令人困惑的是——而且很可能是相关的——偏移量不是连续的,尽管我还没有设置压缩等。

更多细节:

$ kafka-topics.sh --bootstrap-server localhost:9092 --topic TK --describe
Topic: TK       PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: TK       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: TK       Partition: 1    Leader: 0       Replicas: 0     Isr: 0

通过 kafka-console-consumer.sh 打印密钥:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TK \
  --offset earliest --partition 0 --timeout-ms 5000 \
  --property print.key=true --property print.value=false
0108
0253
0278
0198
0278
0210
0253
1058
0141
1141
[2021-09-15 10:54:06,556] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 10 messages

N.B. This topic has been produced to without involvement of transactions, and *) consumption is being done in read_uncommitted mode.

*) Actually, processing.guarantee is set to exactly_once_beta, so that would amount to using transactions.


更多信息事实证明,我可以使用我的 Streams 应用程序可靠地重现这种情况(1. 擦除 kafka/zookeeper 数据,2. 重新创建主题,3. 运行应用程序),其输出是显示此问题的主题。 同时,我已将 Streams 应用程序精简为这种无操作拓扑,并且仍然可以重现它:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [TK1])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: TK)
      <-- KSTREAM-SOURCE-0000000000

News同时,我已将本地运行的 Kafka 代理 (2.5.0) 替换为在 Docker 容器中运行的代理 (wurstmeister/kafka:2.13-2.6.0)。问题仍然存在。

该应用程序使用版本为 6.0.1-ccs 的 kafka 库,对应于 2.6.0。


您应该避免对偏移量进行计算,Kafka 确保任何新的偏移量只会大于上一个偏移量。您可能希望使用密钥并通过验证已收到正确数量的密钥来跟踪您是否已收到适当数量的消息。

卡夫卡有很多事情需要处理 https://www.confluent.io/blog/transactions-apache-kafka/例如 Exactly-Once 语义、重新发送消息以及与该主题相关的其他内部任务。这些消息将被丢弃(不会与您共享)。您只会看到您的消息,并且这些消息偏移量只会增加。

这些事务标记不会暴露给应用程序,但由处于 read_commissed 模式的消费者使用,以过滤掉已中止事务中的消息,并且不返回属于打开事务一部分的消息

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

什么情况下endOffset>lastMsg.offset+1? 的相关文章

  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • 带有 kafka-avro-console-consumer 的未知魔法字节

    我一直在尝试将 Confluence 中的 kafka avro console consumer 连接到我们的旧版 Kafka 集群 该集群是在没有 Confluence Schema Registry 的情况下部署的 我使用以下属性显式
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时

随机推荐

  • Eigen::Ref<> 类的正确用法

    Eigen 引入了 Ref 类 以便在不需要编写模板函数时以 Eigen 对象作为参数编写函数 而无需使用不必要的临时变量 人们可以读到这一点here http eigen tuxfamily org dox TopicFunctionTa
  • 如何使用 htaccess 转发查询字符串?

    目前 我正在使用它来重写 URL RewriteEngine on RewriteRule page php name 1 NC So mysite com home被重写为mysite com page php name home 我怎样
  • Express:POST请求后Req.body未定义

    我在 app router 之前使用了express bodyParser 并且标题似乎是正确的 但我仍然在 req body 上未定义 var app express app use express bodyParser app use
  • ADO.NET 与 ADO.NET 实体框架

    ADO NET 和 ADO NET 实体框架哪个更快 没有什么比 ADO NET 数据读取器更快了 实体框架也在 地下室 使用了它 然而 实体框架可以帮助您从数据库映射到对象 对于 ADO NET 您必须自己完成此操作 这取决于你如何编程它
  • Dexie:如何添加到嵌套对象中的数组

    我正在使用 Dexie IndexedDB 包装器 并且尝试将一个对象添加到嵌套对象内的现有数组中 结构如下所示 Name John age 33 tags skill first NET second JAVA third special
  • Base64 编码数据有哪些缺点?

    在过去两年左右的时间里 我对多种类型的网络数据进行了 Base64 编码 图像 otf文件 文本等 它很实用 因为它作为一种临时的资产整合方法 数据直接嵌入CSS或HTML中 不必担心死链接 但是使用这种方法有什么缺点吗 Base64 编码
  • 使用sqlite、flutter应用程序导入数据库

    我的主要目标是使用 sqlite sqlflite 插件 将数据库导入到 flutter 应用程序中 而不必将所有创建表硬编码到应用程序代码中 我希望信息能够在本地访问手机 在飞行模式下从手机访问数据 这就是我使用 sqlite 的原因 我
  • Javascript - ReactJS - 以 ReadableStream 作为源显示图像

    我有 PNG 图像作为 ReadableStream 我需要使用此 ReadableStream 作为源显示 html 元素 img 我应该将其转换为某种东西吗 或者我该怎么做 谢谢你的答案 来自谷歌的docs https develope
  • 在另一个工厂内使用一个工厂 AngularJS

    我有一个模块 angular module myModule 然后是工厂 angular module myModule factory factory1 function some var s and functions 然后是另一家工厂
  • VB 中 .NET MVC3 Razor 视图中的命名空间引用?

    如何使用 Razor 视图引擎引用 NET MVC3 中的命名空间 据我所知 这可以在 C 中完成 using Namespace 然而在VB中这似乎不起作用 Imports Namespace 我说的是 vbhtml 文件内部 您正在使用
  • 二次读取法

    我必须为二次类编写一个读取方法 其中二次以 ax 2 bx c 的形式输入 该类的描述是这样的 添加一个读取方法 要求用户提供标准格式的方程并正确设置三个实例变量 因此 如果用户输入 3x 2 x 则将实例变量设置为 3 1 和 0 这将需
  • 如何创建字节数组并用随机数据填充它[重复]

    这个问题在这里已经有答案了 我想根据给定大小创建一个填充随机数据的字节数组 我该怎么做 我的方法的签名如下所示 private byte GetByteArray int sizeInKb 这是我尝试过的 private byte GetB
  • iOS 与 IPv6 和 Azure 的问题

    我们的 Xamarin iOS 应用程序之一被拒绝 因为 Service URI 似乎无法从 IPv6 网络获得 从 2016 年 6 月 1 日开始 Apple 希望所有 iOS 应用程序在纯 IPv6 网络中完全兼容 Microsoft
  • 快速检索 WKInterfaceLabel 的文本

    如何在 WatchKit 中获取 Swift 或 Objective C 中的标签文本 该类不是 UILabel 而是 WKInterfaceLabel 我也尝试过搜索苹果的类库 https developer apple com libr
  • 如何使用 NHibernate 锁定模式在更新之前锁定对象?

    首先让我说明一下我想要实现的目标 我有一张桌子 里面装满了工作 有一个 Web 服务 其方法允许更改作业数据 称为SaveJob 此方法检索作业及其所有数据 对新数据运行验证 这需要对其他表进行一些数据库查询 然后将其保存回数据库 有点慢
  • 静态解析类型参数的深入描述

    我正在寻找对静态解析类型参数的非常全面的审查 它们到底可以做什么 它们的局限性是什么 使用它们的效果是什么 它们如何与普通类型参数结合 以及实例级内联成员如何工作 规范本身对这个主题的介绍很少 只是顺便提到了它们 它没有提到具有此类类型参数
  • 如何屏蔽 Jenkins Pipeline 项目中的密码字段?

    当密码属性定义在Jenkinsfile properties parameters password name KEY description Encryption key 每次执行管道时 Jenkins 都会提示用户提供其值 我希望这个参
  • ADO.NET 数据服务的数据压缩

    我有一个由 NET 应用程序 不是 IIS 公开的 ADO NET 数据服务 该服务由 NET 客户端应用程序使用 对此数据服务的某些调用会返回大量数据 我想压缩传输中的 XML 数据以减少负载并提高性能 这可能吗 我假设你是使用 WCF
  • 错误:膨胀类 com.google.android.material.textfield.TextInputLayout

    我收到错误 这是我的 XML 文件
  • 什么情况下endOffset>lastMsg.offset+1?

    Kafka 对于分区返回 endOffset 15 但可以使用的最后一条消息的偏移量为 13 而不是我期望的 14 我想知道为什么 The 卡夫卡文档 https kafka apache org 28 javadoc org apache