如何将 Kafka 承诺的消费者偏移量更改为所需的偏移量

2024-01-08

我有卡夫卡流应用程序。我的应用程序正在成功处理事件。

如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理/跳过事件。我试过如何更改主题的起始偏移量? https://stackoverflow.com/questions/29791268/how-to-change-start-offset-for-topic。但我收到“节点不存在:”错误。请帮我。


您所指的问题/答案基于较旧的 Kafka 版本。从 Kafka 0.9 开始,偏移量不再提交给 ZooKeeper,而是存储在一个特殊的 Kafka 主题中,称为偏移主题(主题名称是__consumer_offsets).

从Kafka 1.0开始,命令行工具bin/kafka-consumer-groups.sh有一个允许设置偏移量的新功能。查看原始 KIP:https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

该工具也适用于 Kafka 0.11(甚至更旧的 Kafka 版本)。

另一种方法是编写自己的工具,该工具使用单个KafkaConsumer与相应的group.id,订阅您想要修改偏移量的主题,seek() and commit()偏移量。 (请注意,您应该为此使用者禁用自动提交。)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 Kafka 承诺的消费者偏移量更改为所需的偏移量 的相关文章

  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • 动物园管理员服务器已启动但没有输出

    我已经在zoo cfg中设置 clientPort 2181 cloudera cloudera vm sudo usr lib zookeeper bin zkServer sh 启动 我得到以下回复 JMX enabled by def
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这

随机推荐

  • 如何在 Spring DM 中获取 BundleContext?

    我刚刚接触 Spring DM 我想访问 Bundlecontext 我的问题是我有一个包含 Activator 的捆绑包 但它没有 xml 配置文件 我无法触及这个捆绑包 那么我需要访问 BundleContext 和 serviceRe
  • CSS/Flexbox:仅显示容器中容纳的尽可能多的项目

    我的目标是构建一个面包屑样式组件 其中 显示项目的水平列表 每个项目都有最大宽度 如果宽度不足以显示项目 则从列表开头省略项目 优先考虑last列表中的项目 所有布局仅通过 CSS 实现 没有 JS 调整大小观察器等 Flexbox 似乎是
  • 如何检查Selenium WebDriver下载的文件?

    我使用 C 在 Selenium webdriver 中编写了一个自动化测试 其中一个步骤需要从服务器下载 XLSX 文件 如何验证文件是否已成功下载并获取其名称 Regards 我通过以下源代码找到了解决方案 string current
  • 包含服务器上任何位置的文件

    我的网站在生产服务器上完美运行 我已将其移至另一个网络服务器 VPS 让我用例子来解释一下 目录结构 includes header php business index php some other files index2 php 在我
  • 插入python数组以最小化元素之间的最大差异

    插值一维数组以使元素之间的最大差异最小化的简洁且可读的方法是什么 例如 如果我有数组 4 9 13 25 并且允许我再添加 1 个数字以最小化元素之间的最大差异 我会在 13 和 25 之间插入 19 最大差异现在是 6 而不是 12 当然
  • 在 MVC 中使用 ASP.NET 服务器控件?

    在我当前的项目中 我需要添加一项功能 允许用户查看其上传的 PDF 的缩略图 我找到了一个方便的组件 http www tallcomponents com pdfthumbnail aspx就实现了这一点 基本版本是免费的 但足以满足我当
  • 为什么原始类型有一个“类”,它是如何使用的?

    谈论 Java 7 您可以获得一个基本类型的类 如下所示 Class classOfInt int class 对于每一个 您都会得到一个名为原始类型的 类 int class gt int byte class gt byte doubl
  • 使用类包装器和 __new__ 装饰类

    Code import functools class MyInt1 int def new cls x value print MyInt1 new cls x value return super new cls x base 2 de
  • std::vector 的性能不佳是否是由于未调用 realloc 对数次数所致?

    EDIT 我又添加了两个基准测试 以比较 realloc 与 C 数组的使用以及 Reserve 与 std vector 的使用 从最后的分析看来 realloc 的影响很大 即使只调用了 30 次 检查文档 我猜这是因为 realloc
  • sql group_concat 和子查询

    我有 2 个 mysql 表 car model id int Primary Key title varchar id brand int FK to car brand table car car model relation many
  • 使用 Devise 添加 TOS 协议复选框

    我们正在为我们的会员使用设备 我们已将 tos agreement 字段 布尔值 添加到我们的成员架构中 并将其添加到views devise registrations new html haml 在Member模型中 我们有一个验证规则
  • 返回类型为空

    我正在使用 PHP 7 测试返回类型 我创建了一个简单的脚本来测试 PHP 7 的返回类型
  • bash - 递增包含字母的变量

    我有一组有效字符 0 9a z 和一个分配了其中一个字符的变量 我想要做的是能够将该变量增加到集合中的下一个变量 如果需要 我可以处理 特殊 情况 即从 9 增加到 a 从 z 增加到 但我不知道如何增加字母 bin bash y b ec
  • 在文本框上手动设置不显眼的验证错误

    我正在做类似于远程验证的事情 除了我已经通过 jquery 手动进行调用并设置我必须设置的任何内容 现在我的问题是 如果我想告诉验证器特定的文本框无效 并阻止页面提交 突出显示文本框等 我将如何从代码中做到这一点 Html LabelFor
  • 带有自定义 API 视图的 Django Rest 框架分页

    我正在尝试将分页添加到我的项目中 但找不到任何明确的文档或教程 我有一份办公室清单 楷模 办公室 py class Office Model name CharField name default None max length 255 n
  • python 中的 while 循环只要正则表达式匹配

    好的 我知道这可能不是在循环中使用正则表达式编辑字符串的最佳方法 只是为了感兴趣 我将如何构建一个循环 只要匹配就执行正则表达式模式 在循环中运行并在不再命中时停止 我在 python 中做这个 match re search r patt
  • OxyPlot - 如何删除轴

    我想创建一个没有任何可见轴的 Oxyplot 视图 谁能告诉我该怎么做 为了避免误解 我从未在绘图模型中添加任何轴 此代码已经添加了轴 如何避免它们被显示 C plot new PlotModel var ser new LineSerie
  • FirebaseRecyclerAdapter 和 Android 上的多种项目类型

    我想添加 2 种类型 有图像 没有图像 in my recyclerview 我知道我需要重写方法FirebaseRecyclerAdapter但我不知道怎么做 请帮我解决这个问题 Override public int getItemVi
  • Spark Streaming 2.0.0 - 在负载下几天后冻结

    我们在带有 Spark 2 0 0 的 AWS EMR 5 0 0 上运行 从 125 个分片 Kinesis 流中使用 使用 2 个消息生成器提供 19k 个事件 秒 每条消息大小约为 1k 使用 20 台机器组成的集群进行消费 该代码有
  • 如何将 Kafka 承诺的消费者偏移量更改为所需的偏移量

    我有卡夫卡流应用程序 我的应用程序正在成功处理事件 如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理 跳过事件 我试过如何更改主题的起始偏移量 https stackoverflow com questions 29791