卡夫卡偏移量管理

2024-01-18

我们正在使用 Kafka 0.10...我在网上(和文档中)看到了一些相互矛盾的信息,这些信息涉及当enable.auto.commit 为 TRUE 时如何在 kafka 中管理偏移量。检索消息的同一个 poll() 方法是否也按配置的时间间隔处理提交?

如果我在单线程应用程序中从 poll 检索消息,在同一线程中处理消息直至完成(包括处理错误),这意味着在处理完成之前不会再次调用 poll() ,那么我认为没有恐惧丢失消息,对吗?仅当 poll() 在后续调用中尝试提交时(当然,如果 auto.commit.interval.ms 已通过),这才有效。如果在收到消息后立即完成提交(在我的应用程序处理消息之前),这对我们不起作用......

这很重要,因为我想确定如果我们使用自动提交策略,我们不会丢失消息。重复的消息对我们来说是可以容忍的,但我们不能容忍丢失的数据。

感谢您的澄清!


检索消息的同一个 poll() 方法是否也按配置的时间间隔处理提交?

是的。 (如果enable.auto.commit=true.)

如果我在单线程应用程序中从 poll 检索消息,在同一线程中处理消息直至完成(包括处理错误),这意味着在处理完成之前不会再次调用 poll() ,那么我认为没有恐惧丢失消息,对吗?

Yes.

仅当 poll() 在后续调用中尝试提交时才有效(当然,如果 auto.commit.interval.ms 已通过)

这正是它的完成方式。

请参阅此处了解更多详细信息:http://docs.confluence.io/current/clients/consumer.html http://docs.confluent.io/current/clients/consumer.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

卡夫卡偏移量管理 的相关文章

  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或

随机推荐

  • GDI+ 通用错误 ASP.NET MVC

    我遇到了 GDI 通用错误我已经尝试了每个人所说的方法 即确保包含正在读取的图像文件的文件夹 如下所示 public ImageResult ProfileAsset string profile int width int height
  • 如何处理 Last.FM 或 Wikipedia 等社区 URL 样式?

    我试图理解我应该如何与角色一起工作URLs 这是因为我正在构建一个网站 用户可以在其中存储内容并通过在URL 所以 像Wikipedia or Last FM网站 我在网站上看到 用户可以写类似的内容http it wikipedia or
  • 我必须使用哪个 Windows 注册表项来安装 Delphi Expert?

    我需要为 Delphi IDE 专家创建一个安装程序 所以我找到了这两个注册表项 HKEY CURRENT USER Software Embarcadero BDS x 0 Known IDE Packages HKEY CURRENT
  • pd.read_html() 导入列表而不是数据框

    I used pd read html 从网页导入表格 但 Python 不是将数据构建为数据帧 而是将其导入为列表 如何将数据导入为数据框 谢谢你 代码如下 import pandas as pd import html5lib url
  • 在 Haskell 类型类中记录选择器

    我想实施一个Type Class有几个默认方法 但我收到一个错误 我无法使用record selectors inside type classes定义 下面的代码基本上创建了type class它定义了add函数 它应该添加一个元素到re
  • 在 ubuntu 12.04 上编译和构建 qt4

    我正在尝试从源代码编译和构建 Qt 4 8 3 当我执行 configure 时 收到以下错误消息 configure 183 configure QMAKE CXX print substr 0 index 0 1 not found E
  • Magento 1.7 验证码模块

    在新的 Magento 版本中 系统 gt 配置 gt 客户配置 gt 验证码 中的验证码选项我创建了一个名为 Signmeup 的新表单 但它似乎不起作用 我无法让它显示 目前该块未显示在页面上 不是动态页面 具有核心法师启动的静态页面
  • 获取字符串形式的对象属性名称

    是否可以获取字符串形式的对象属性名称 person person first name Jack person last name Trades person address person address street Factory 1
  • 仅显示 0-90% 或 0-95% 百分位

    这是我的代码和绘图结果 由于一些异常值 x 轴很长 有没有一种简单的方法可以过滤df fooR 中仅 0 90 或 0 95 百分位数 以便我只能绘制正常值 谢谢 df lt read csv Downloads foo tsv sep t
  • PostgreSQL - 当 UPDATE 失败时返回行内容

    当使用 PostgreSQL 更新行时 通过 UPDATE 语句 可以使用以下命令取回修改后的行内容RETURNING 我想知道当更新因约束而失败时是否有任何方法可以获取未修改的行 例如 执行以下命令 使用RETURNING 不返回当前行值
  • 在 Windows 7 64 位上安装 Android SDK:“未找到 JDK”? [复制]

    这个问题在这里已经有答案了 可能的重复 Android SDK安装找不到JDK https stackoverflow com questions 4382178 android sdk installation doesnt find j
  • “文件指针”、“流”、“文件描述符”和...“文件”之间的区别?

    有一些相关的概念 即文件指针 stream and 文件描述符 我知道一个文件指针是一个指向数据类型的指针FILE 例如声明FILE h and struct FILE h 我认识一个文件描述符 is an int 例如成员 fileno
  • 联系人选择器查询是否需要 read_contacts 权限,具体取决于 Android 版本?

    我们一直在尝试使用 Android 联系人选择器 以允许用户在应用程序不需要 READ CONTACTS 权限的情况下选择联系人 以下意图似乎适用于 4 0 ICS 设备以及 2 3 3 模拟器 Intent contactPickerIn
  • /**在c++中是什么意思

    我试图在 c 中添加块注释 但我做了一个 type o 并写道 代替 我注意到里面的评论变成了粗体 有谁知道这意味着什么 这种形式的评论被使用Doxygen http www doxygen nl 该软件使人们能够生成源代码文档 请参阅网站
  • 如何从jsp/servlet发送电子邮件?

    如何从 JSP servlet 发送电子邮件 是否需要下载一些 jar 或者您可以在没有任何 jar 的情况下从 JSP servlet 发送电子邮件吗 我的 Java 代码会是什么样子 我的 HTML 代码会是什么样子 如果有的话 是否需
  • in_array() 性能优化

    我有以下条件 if in array needle haystack in array needle somePostfix haystack in array needle someOtherPostfix haystack and so
  • 使用键:值对填充选择框?

    我正在使用 jQuery 并让服务器代码返回以下值 0 SELECT ONE 1 VALUE1 2 VALUE2 etc 如何将其填充到选择框中 var 0 SELECT ONE 1 VALUE1 2 VALUE2 targetSelect
  • ArrayAdapter:按索引删除

    我有一个 ListView 其中填充了新闻服务器概要 只是故事列表 和一个用于修改该 ListView 的 arrayAdapter 我可以通过 remove Object 函数删除项目 但是如果有多个 Object 实例怎么办 remov
  • VssUnauthorizedException VS30063 您无权访问突然抛出错误

    我有以下使用 VSTS 客户端 API 的代码 该代码可以正常工作 但现在返回错误 Microsoft VisualStudio Services Common VssUnauthorizedException VS30063 您无权访问h
  • 卡夫卡偏移量管理

    我们正在使用 Kafka 0 10 我在网上 和文档中 看到了一些相互矛盾的信息 这些信息涉及当enable auto commit 为 TRUE 时如何在 kafka 中管理偏移量 检索消息的同一个 poll 方法是否也按配置的时间间隔处