我们如何读取给定时间范围内的Kafka主题?

2024-05-06

我需要读取 Kafka 主题中给定时间范围内的消息。我能想到的解决方案是首先找出时间范围开始的最大偏移量,然后继续消费消息,直到所有分区上的偏移量超过时间范围的末尾。有没有更好的方法来解决这个问题?谢谢!


好吧,您肯定必须首先搜索适合时间范围开头的第一个偏移量。

这可以使用以下方法完成KafkaConsumer#offsetsForTimes https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map- method.

该方法接受一个映射Map<TopicPartition, Long(timestamp)>,并返回一个Map<TopicPartition, OffsetAndTimestamp>时间戳在哪里OffsetAndTimestamp是带有时间戳的第一条消息等于或大于然后是指定的那个。

从那里,您可以将消费者分配给返回的偏移量,并进行迭代,直到记录中的时间戳超过时间范围的末尾。

一些伪代码:

static void main(String[] args) {
    String topic = args[1];
    long timestampBeginning = Long.parseLong(args[2]);
    long timestampEnd = Long.parseLong(args[3]);
    TopicPartition partition = new TopicPartition(topic, 0);

    Consumer<Object, Object> consumer = createConsumer();

    long beginningOffset = consumer.offsetsForTimes(
            Collections.singletonMap(partition, timestampBeginning))
                    .get(partition).offset();

    consumer.assign(Collections.singleton(partition)); // must assign before seeking
    consumer.seek(partition, beginningOffset);

    for (ConsumerRecord<Object, Object> record : consumer.poll()) {
        if (record.timestamp() > timestampEnd) {
            break; // or whatever
        }

        // handle record
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

我们如何读取给定时间范围内的Kafka主题? 的相关文章

  • Kafka结构化流KafkaSourceProvider无法实例化

    我正在开发一个流项目 其中有一个 ping 统计数据的 kafka 流 如下所示 64 bytes from vas fractalanalytics com 192 168 30 26 icmp seq 1 ttl 62 time 0 9
  • 谁在为kafka集群设置授权

    我有一个 3 节点 Kafka 集群和 2 个用于生产者和消费者的 kafka 客户端 我已启用 SSL 身份验证 我想为集群启用授权 我已在代理节点的 server properties 中添加了以下属性 authorizer class
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 当记录处理时间超过“max.poll.interval.ms”时,在消费过程中记录/消息会发生什么?

    我的消费者设置如下 auto offset reset earliest enable auto commit true default value session timeout ms 10000 default value max po
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导

随机推荐

  • 在 HP Load Runner 的 VuGen 中加载 javai.dll 时出现错误

    当我尝试在 HP load runner 的 VuGen 中编译一个简单的脚本时 无法启动 JVM 并出现以下错误 Java VM Internal Error Getting Error Loading javai dll 我在用着 HP
  • 单元测试(有时)会因为浮点不精确而失败

    我有课Vector代表 3 维空间中的一个点 这个向量有一个方法normalize self length 1 将矢量缩小 放大为length vec normalize length length 该方法的单元测试有时由于浮点数的不精确性
  • 日期时间值如何存储在mysql中?

    我是数据库世界的新手 我正在从 java 程序发送一个日期时间值 例如 2016 04 27 09 00 00 以便将其保存到 mysql 数据库中 我的问题是如何将该值保存到 mysql 数据库表中的日期时间类型字段中 我的意思是 它是否
  • 将 pandas 数据框中的所有 inf、-inf 值替换为 NaN

    我有一个大型数据框 不同列中包含 inf inf 值 我想用 NaN 替换所有 inf inf 值 我可以逐栏这样做 所以这有效 df column name df column name replace np inf np nan 但我的
  • NumPy 数组中负数和正数岛的计数

    我有一个包含负元素块和正元素块的数组 一个更简单的例子是一个数组a看起来像 array 3 2 1 1 2 3 4 5 6 5 4 a lt 0 sum and a gt 0 sum 给我消极和积极元素的总数 但我如何按顺序计算它们 我的意
  • 类似于eternity的C++对象持久化库

    我正在寻找一个 C 对象持久库来替换永恒图书馆 http sourceforge net projects eternity it 我已经用它制作了大约一天的原型 永恒图书馆的能力不足 我创建了一个与此类似的对象层次结构 我有一个std l
  • 从后台应用程序启动活动

    我的应用程序在后台运行 我希望当运行下面的代码时该应用程序显示在 Android 手机的顶部 启动 我知道代码肯定会运行 这似乎是一件简单的事情 但我在这个网站上花了几个小时 每个人似乎都在建议这样的事情 Intent intent new
  • Angular js ng-view 渲染事件

    我如何知道 ng view 何时完全渲染 目前我尝试使用 scope on viewContentLoaded function scope on routeChangeSuccess function angular element do
  • 时间:2019-01-09 标签:c#decimaltoString()转换与逗号(,)

    c decimal toString 转换问题 Example 我有一个十进制值 1 当我使用 toString 将十进制转换为字符串时 它返回 0 10 它返回 COMMA 而不是 DOT 我相信这与您的操作系统设置的文化 地区有关 您可
  • hive查询无法通过jdbc生成结果集

    我是 Hive 和 Hadoop 的新手 在我的教程中 我想将表创建为 import java sql SQLException import java sql Connection import java sql ResultSet im
  • 如何在java中执行复合sql查询?

    如何执行以下查询并通过准备好的语句检索结果 INSERT INTO vcVisitors sid VALUES SELECT LAST INSERT ID 有没有办法同时执行这两个语句 我尝试执行以下操作 Connection con Db
  • Google Sheets - 如何将过滤功能与过滤视图结合起来

    我一直在处理一个包含 100 多行的电子表格 并发现了一种巧妙的方法来合并 隐藏 复选框 该复选框将隐藏 C 列与框旁边指定的特定值 建筑类型 匹配的任何行 为此 我首先创建了一个如下函数 FILTER Data A1 OR Data C1
  • IE 中的表格布局错误(7)

    下面是一个带有表格布局的简单 html 代码 在 FF 中 它看起来就像我认为的那样 在 IE7 中则不然 我究竟做错了什么 我该如何解决它 table cellspacing 0 cellpadding 0 border 1 tbody
  • Android - 如何通过检查已发送的项目来确定电子邮件是否已发送

    我有一个应用程序 我使用意图发送电子邮件 如下所示 TODO attach and send here try Log i getClass getSimpleName send task start String address emai
  • 如果字符串中的第一个字符是逗号,则删除它

    我需要在 javascript 中设置一个函数来删除字符串的第一个字符 但前提是它是逗号 我找到了substr函数 但这将删除任何内容 无论它是什么 我当前的代码是 text value newvalue substr 1 text val
  • 我可以将 UIScrollView 放入另一个 UIScrollView 中吗

    我有一个UIScrollView仅在垂直方向滚动 我需要放置UIScrollView它可以水平移动 就像苹果设备中的AppStore应用程序一样 我不想我们UICollectionView因为我有静态数据并且我只需 3 个水平UIScrol
  • Window.AllowsTransparent 设置为 true 时 wpf 中的运行时错误

    当我设置时 我在运行时抛出异常AllowsTransparency True 我得到一个例外 说WindowStyle不能设置为None if AllowsTransparency设置为 true 即使我明确地说WindowStyle被设定
  • LINQ:根据列值选择重复行

    我试图在我的 DataGrid 中显示那些共享相同列值的行 例如 对于具有相同姓氏的人 我尝试了以下方法 dataGrid ItemsSource dataContext Addresses GroupBy a gt a SurName W
  • 将 fill_ Between() 与 Pandas 数据系列一起使用

    我已经绘制了 使用 matplotlib 时间序列及其相关的置信区间上限和下限 我在 Stata 中计算的 我使用 Pandas 读取 stata csv 输出文件 因此该系列的类型为 pandas core series Series M
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围