如何重置 Kafka 偏移量以匹配尾部位置?

2024-02-08

我们将 Storm 与 Kafka 和 ZooKeeper 结合使用。我们遇到过这样的情况:我们必须删除一些主题并用不同的名称重新创建它们。除了现在读取新主题名称之外,我们的 Kafka spouts 保持不变。但是现在,当尝试从新主题读取数据时,spouts 使用旧主题分区的偏移量。因此 my-topic-name 分区 0 的尾部位置将为 500,但偏移量将约为 10000。

有没有办法重置偏移位置,使其与主题的尾部匹配?


有多种选择(如 Storm 的KafkaSpout不提供任何API来定义起始偏移量)。

  1. If you want to consumer from the tail of the log you should delete old offsets
    • depending on you Kafka version
      • (0.9之前)你可以操纵ZK(这有点棘手)
      • (0.9+) 或者您尝试删除主题中的偏移量__consumer_offsets(这也很棘手,可能也会删除您想要保留的其他偏移量)
    • 如果没有偏移量,您可以使用自动偏移重置策略“最新”或“最大”(取决于您的 Kafka 版本)重新启动您的 spout
  2. as an alternative (which I would recommend), you can write a small client application that uses seek() to manipulate the offset in the way you need them and commit() the offsets. This client must use the same group ID as you KafkaSpout and must subscribe to the same topic(s). Furthermore, you need to make sure that this client application is running a single consumer group member so it get's all partitions assigned.
    • 为此,您可以查找日志末尾并提交
    • 或者您提交无效的偏移量(如-1)并依赖自动偏移量重置配置“最新”或“最大”(取决于您的Kafka版本)

对于 Kafka Streams,有一个“应用程序重置工具”可以执行类似的操作来操作已提交的偏移量。如果您想了解一些详细信息,可以阅读这篇博文http://www.confluence.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/ http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

(免责声明:我是这篇文章的作者,它是关于 Kafka Streams 的——尽管如此,底层的偏移量操作思想是相同的)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何重置 Kafka 偏移量以匹配尾部位置? 的相关文章

  • Android Toast 消息不起作用

    我正在通过 Andengine 为 Android 开发游戏 我有 MainActivity 类和 GameScene 类 我在 GameActivity 中使用 Toast 消息 它正在发挥作用 Toast makeText this H
  • 如何在ArrayList中的特定位置插入对象

    假设我有一个大小为 n 的对象的 ArrayList 现在我想在特定位置插入另一个对象 假设在索引位置 k 大于 0 且小于 n 并且我希望索引位置 k 处及其之后的其他对象向前移动一个索引位置 那么有没有什么方法可以直接在Java中做到这
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • Java 小程序在 Mac 上闪烁

    这个问题很奇怪 问题并非在每个平台上都会发生 我在使用 MacOSX 的 Google Chrome 中出现了这种情况 但在 Safari 中却没有出现这种情况 对于使用 Windows 的朋友来说 在 Google Chrome 上运行得
  • 如何从 Java 访问 Windows 设备管理器中的信息?

    我有一个串行 USB 设备 并且其中多个设备可以连接到计算机 我需要查询和检索设备连接到的 COM 端口列表 在 Windows 设备管理器中 您可以获得当前连接的设备的 COM 端口 友好名称 该列表是动态的 从注册表中读取不工作 htt
  • JAXB 继承冲突 - 重新注释子类

    目前我的项目中有这样的环境 public abstract class Foo private List
  • Apache Thrift Java-Javascript 通信

    我正在编写一个基于 Apache Thrift 的 Java 服务器 它将从 Javascript 客户端接收数据 我已经完成了 Java 服务器 但问题是我可以获得 Javascript 客户端的工作示例 我无法找到一个好的示例 构建文档
  • 为什么通过 方法向 List 添加元素(类型正确)会出现编译错误? [复制]

    这个问题在这里已经有答案了 我对泛型通配符概念几乎没有疑问 1 假设我有一个方法 void write List
  • 动物园管理员服务器已启动但没有输出

    我已经在zoo cfg中设置 clientPort 2181 cloudera cloudera vm sudo usr lib zookeeper bin zkServer sh 启动 我得到以下回复 JMX enabled by def
  • Maven WebApp META-INF context.xml

    我正在使用 Maven 3 并且尝试在 webapp 文件夹下添加 META INF 文件夹 所以我正在尝试执行以下操作 src main webapp META INF context xml WEB INF 下面是我的 POM 文件
  • 绘制平滑曲线

    我想创建更平滑的曲线 而不仅仅是线角 这是我现在画的图 这是我的代码 case FREEHAND float pts float ptk ptk new float 2 imageMatrix invert inv if mCurrentS
  • 如何自动转换十六进制代码以将其用作 Java 中的 byte[]?

    我这里有很多十六进制代码 我想将它们放入 Java 中 而不需要向每个实体附加 0x 喜欢 0102FFAB 和我必须执行以下操作 byte test 0x01 0x02 0xFF 0xAB 我有很多很长的十六进制代码 有什么办法可以自动做
  • Java 中如何验证字符串的格式是否正确

    我目前正在用 Java 编写一个验证方法来检查字符串是否是要更改为日期的几种不同格式之一 我希望它接受的格式如下 MM DD YY M DD YY MM D YY 和 M D YY 我正在测试第一种格式 每次它都告诉我它无效 即使我输入了有
  • 膨胀类 android.support.design.widget.NavigationView 时出错

    我按照 NavigationView 的教程进行操作 但无法解决此错误消息 Error inflating class android support design widget NavigationView 教程链接 https www
  • JPA Web 应用程序管理策略

    我们目前正在开发一个 J2EE Web 应用程序 使用 JPA 作为我们的数据访问层 我们目前正在研究几种不同的策略来在我们的应用程序中利用缓存 Create an EntityManager per request 在请求范围内获取缓存
  • Hibernate 标准接受 %% 值

    我正在使用下面的 Hibernate 代码来过滤workFlowName crt add Restrictions like workFlowName workFlow MatchMode ANYWHERE crt is the crite
  • java.lang.UnsatisfiedLinkError - android studio gradle 中的 NDK?

    文件夹结构 app main java jni Android mk Application mk hello jni c res 在构建 gradle apply plugin com android application androi
  • 在java中执行匿名pl/sql块并获取结果集

    我想执行匿名 PL SQL 并需要获取结果集对象 我得到了可以通过在 PL SQL 块内使用游标来完成的代码 但 PL SQL 块本身将以文本形式来自数据库 所以我无法编辑该 PL SQL 块 并且它只会返回两个值 其列名始终相同 它将返回
  • Firebase:用户注册后如何进行电话号码验证?

    所以我知道我可以使用电子邮件验证或电话号码验证 但我想做的是在用户注册或登录后进行电话号码验证 如何连接这两种身份验证方法 最后 Firebase中是否有一个函数可以检查用户是否通过电话号码验证 谢谢 即使用户已通过身份验证 您仍然可以使用
  • Android ClassNotFoundException:在路径上找不到类

    10 22 15 29 40 897 E AndroidRuntime 2561 FATAL EXCEPTION main 10 22 15 29 40 897 E AndroidRuntime 2561 java lang Runtime

随机推荐

  • 在支持多租户的 Django 中正确加载 Keras 模型

    我尝试在 django 中编写一个 REST api 它使用 Keras 模型返回预测 但是 那load model 函数需要一些时间来加载模型 我不希望我的用户必须等待这么长时间 每次初始化模型时 初始化模型的正确方法是什么 以便加载一次
  • 链接到 libcuda.so 时出现问题

    我被一个似乎很容易诊断和解决的问题所困扰 我有一个使用 CUDA 驱动程序 API 的 C 源文件 当我使用它编译时nvcc 可执行文件已生成并且可以正常运行 但是 当我尝试使用它来编译它时g 链接器抱怨对符号的未定义引用 usr lib
  • JSF 和 Spring 性能与较差的 JSP 性能

    我看到我的同事开发了一些 JSF 项目 这些项目在我看来速度非常慢 有人有同样的看法吗 我目前正在使用 jsp jstl 和 jQuery 作为 富 客户端 我想知道现代框架 jsf wicket tapestry 相对于旧的普通 jsp
  • 使用 XPATH 子字符串函数

    我需要帮助弄清楚这个 XPATH 子字符串函数 但我无法使用它 我尝试过这种方法的各种变体 a contains text Home href substring jsessionid 12 尝试从此标签获取 jsessionid 值 a
  • Web 服务与 WCF

    我正在从事 ASP NET 应用程序 NET 4 框架 设计 想了解使用 Web 服务与 WCF 技术的优缺点和最佳实践是什么 该应用程序最终将被外部客户端用来消费数据 您什么时候会使用 WebServices 什么时候会使用 WCF 其中
  • 如何以高质量保存绘图?

    我用情节制作的每张图表在保存之前看起来都很棒 所以图像看起来有点哑光 如果有意义的话 质量真的很差 有谁知道如何高质量保存它 您可以使用此基本图表作为示例 library plotly x lt c 1 100 random y lt rn
  • 使用 Apache 的 .htaccess 使子目录不受密码保护

    目前在我的服务器上 我的 Web 目录的根目录中有一个 htaccess 文件 AuthUserFile path to root www htpasswd AuthType Basic AuthName Economic Complexi
  • 架构问题:使用依赖注入导致垃圾 API

    我正在尝试创建一个类 它执行各种与数据库相关的低级操作 但为 UI 层提供了一个非常简单的界面 此类表示全部位于特定聚合根内的一堆数据 由单个 ID int 检索 构造函数有四个参数 public AssetRegister int cas
  • App_Data 文件夹中的图像未显示在浏览器中

    当我将图像 URL 属性设置为 App Data 文件夹中的 asp 图像控件时 图像显示在页面设计视图中 但不显示在浏览器中
  • JPA/Hibernate 中键“PRIMARY”的重复条目

    我有一个many to manymysql 数据库中的关系 Module
  • 变量前的美元符号

    我有这个示例代码 用于从现有数据框 my data 创建新数据框 new data new data NULL n 10 this number correspond to the number of rows in my data con
  • 我怎样才能在我的桌子上有一个圆形边框和边框折叠:折叠? [复制]

    这个问题在这里已经有答案了 我有以下内容 table style border 1px solid 999 thead tr style background color red th Weekday th th Date th th Ma
  • 如何将struct从合约A传递到合约B?最佳实践

    我发现这样 当创建一个具有结构的通用接口时 然后合约A和B继承该具有结构的接口 但我想知道是否还有其他方法 是否存在可以更新具有结构的合约的情况 pragma experimental ABIEncoderV2 pragma solidit
  • 确定隐马尔可夫模型中隐藏状态的数量

    我正在学习隐马尔可夫模型 用于对 t 个图像帧序列中的运动进行分类 假设每个帧有 m 个维度的特征 然后我将它聚集成一个符号 用于可观察的符号 我为 k 类创建 k 个不同的 HMM 模型 那么 如何确定每个模型的隐藏状态数量以优化预测 顺
  • 密码保护 iPhone 应用程序

    我正在启动一个新应用程序 我想知道如何需要密码才能打开它 我正在考虑一个UIActionSheet在应用程序中didFinishLaunchingWithOptions应用程序委托实现文件的方法 但我不确定如何去做 不过我会继续努力 Fou
  • AngularFire $add 操作导致浏览器冻结

    我正在使用 angularjs 学习 Firebase 从本教程https thinkster io tutorials angularfire realtime slack clone creating the channels side
  • 在 Java 中对双精度值进行哈希处理

    我想知道如何在 Java 中对 double 进行哈希处理 我已经散列了其他原始数据和对象 我想我可以使用 hashcode 方法吗 从我所看到的来看 这看起来相当复杂 我遇到了一些关于创造种子的事情 我想知道关于如何解决这个问题的任何想法
  • 制作通用数组是不好的做法吗?替代方案是什么?

    我在学校用 C 编码已经三年了 两天前我开始用 Java 编码 我的问题是 制作通用数组是不好的做法吗 另一种选择是什么 我很困惑 除了做一些奇怪的事情 例如这个例子 之外 我似乎无法制作通用数组 Class implementing th
  • 对象中对象中的 JavaScript `this`?

    抱歉 帖子标题模糊 我无法为这篇文章制定正确的英文名称 例如我有这样一个对象 var APP a 1 b function return this a 这样 如果我打电话console log APP b than this将引用 APP
  • 如何重置 Kafka 偏移量以匹配尾部位置?

    我们将 Storm 与 Kafka 和 ZooKeeper 结合使用 我们遇到过这样的情况 我们必须删除一些主题并用不同的名称重新创建它们 除了现在读取新主题名称之外 我们的 Kafka spouts 保持不变 但是现在 当尝试从新主题读取