在Kafka中如何根据生产时间获得准确的偏移量

2024-01-09

我需要每天每小时获取 Kafka 生成的消息。每隔一小时我就会启动一个作业来消费 1 小时前生成的消息。例如,如果当前时间是 20:12,我将在 19:00:00 到 19:59:59 之间消费该消息。这意味着我需要在时间 19:00:00 之前获取开始偏移,并在时间 19:59:59 之前获取结束偏移。我使用了SimpleConsumer.getOffsetsBefore,如「0.8.0 简单消费者示例 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example」。问题是返回的偏移量与作为参数给出的时间戳不匹配。例如当 make timestamp 19:00:00 时,我收到在时间 16:38:00 生成的消息。


下面的kafka消费者api方法getOffsetsByTimes()可以用于此,它可以从 0.10.0 版本或更高版本开始使用。看JavaDoc https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在Kafka中如何根据生产时间获得准确的偏移量 的相关文章

  • 格式化时间戳

    如何将 Rails 时间戳格式化为更易于理解的格式 如果我只是打印出来created at or updated at在我看来是这样的 然后我会得到 2009 03 27 23 53 38 世界标准时间 The strftime http
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 如何使用 JavaScript 将时间戳字符串转换为本地时间?

    我有一个 JSP 页面 其中我将存储在数据库中的时间戳作为字符串提取 其形式如下Thu Aug 21 2014 22 09 23 GMT 0530 India Standard Time 当然 我可以按原样在页面中显示它 但是我一直在寻找
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 按时间戳字段中的日期过滤结果

    我已经获得了一些帮助 但不确定为什么这不起作用 我正在尝试使用表单让用户过滤他们的活动 存储在数据库中 My code GET from 01 11 2013 GET to 25 11 2013 from DateTime createFr
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • 如何在 JavaScript 中将日期时间微格式转换为本地时间?

    我有一个页面当前正在使用日期时间微格式 http microformats org wiki datetime design pattern显示时间戳 但我只显示我自己的时区的人类可读时间
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 获取参考成员(非 POD)的偏移量

    这是代码片段 include
  • mysql 如何将 varchar(10) 转换为 TIMESTAMP?

    我已将所有日期存储到数据库中varchar 10 现在我想将它们转换为 TIMESTAMP 当我运行sql时 ALTER TABLE demo3 CHANGE date date TIMESTAMP NOT NULL 它提醒 1292 In
  • MySQL 正在将我的时间戳值转换为 0000-00-00

    我是 PHP 新手 目前仍在学习中 我认为我的注册表有问题 username password email全部成功插入MySQL registered and last seen不要 我以为我正在使用getTimestamp 错了 但它呼应
  • KafkaConsumer.commitAsync() 行为的偏移量比以前更低

    kafka 将如何处理调用 KafkaConsumer commitAsync Map
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache

随机推荐

  • 如何用javascript从pdf文件中提取文本? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 所以我想知道是否有一种方法可以在javascript中从pdf中提取文本 我已经调查了一些 npm 模块 例如 PDF TO TEXT
  • 如何在Windows不分配驱动器号的情况下创建分区?

    我正在尝试通过 Windows API 对附加的虚拟硬盘进行初始化和分区 我已经成功使用设备Io控制 http msdn microsoft com en us library windows desktop aa363216 28v vs
  • Google Adwords CSP(内容安全政策)img-src

    中包含哪些域 协议img src是否需要 Content Security Policy 标头指令才能允许 Google AdWords 转化跟踪 从测试来看 当我们打电话时google trackConversion 看起来浏览器会创建一
  • JDK16 和 Mac OS 上的 Mockito - 无法初始化插件

    java lang IllegalStateException Could not initialize plugin interface org mockito plugins MockMaker alternate null Cause
  • 测试 angular2 dart 组件

    我写了一个组件并想测试它 如何从组件编写测试 有没有 Angular dart 的测试框架 您可以使用test https pub dartlang org packages test包含实验测试实现的包https github com d
  • 防止我的nodejs服务器中的xhr攻击[关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我正在开发一个将与客户端移动应用程序
  • Redux 中间件工作一半,但未完全取消操作

    我有一些简单的中间件 可以工作 但也不能工作 基本上我有一个用户列表 我正在尝试删除一个 然后与 firebase 同步 一切都很好 我添加了一些中间件 以便当用户删除一个中间件时 它会询问您是否确定 只需使用一个简单的alert目前 如果
  • 基于 WPF DataGrid 中的 DataGridTemplateColumn 设置列样式

    我正在使用 WPF DataGrid 其中如果该行可编辑 其中一列需要显示 编辑 超链接 这由该行的支持模型中的布尔标志指示 我能够使用 DataGridTemplateColumn 实现这一点 没有问题 然而 对整行的附加要求是在选择该行
  • 使按钮在鼠标悬停时透明

    我正在使用 Microsoft Visual Studio Express 2012 构建 Metro 风格应用程序 我对此应用程序非常陌生 需要帮助 我在 XAML 中定义了一个按钮 按钮背景是从图像设置的 将鼠标悬停在按钮上时 会将其背
  • 错误 Microsoft.Web.Infrastruct,版本 = 1.0.0.0,文化 = 中性,PublicKeyToken = 31bf3856ad364e35

    我有一个小型网络应用程序 在我在应用程序中添加两个 genericHandler 之前 它工作得很好 我对 http 处理程序进行了以下更改
  • 在Python中从操作系统生成一个随机单词

    我正在用 Python 创建一个刽子手游戏 我希望能够生成一个随机单词 我总是可以列出一个列表 但如果可能的话 我希望不必手动写下所有单词 Python 中有函数 读取内置 MacOS 字典的方法吗 Thanks 大多数 Unix 类似 系
  • 有没有办法自动创建 Mongo 编解码器?

    我愿意将我的代码从 mongojack 迁移到支持新的异步 mongo 驱动程序的代码 然而我发现新的编码 解码方式是通过Codec我不认为自己在写Codec对于我的模型中的每个类 这就是为什么我宁愿编写一个库 给定一个类创建一个Codec
  • Webpack - 从非模块文件加载函数?

    假设我有一个文件 nonModuled js A non moduled file let s say I can t module it console log 0 function go a console log go a 我还有另一
  • 在jsp中获取上下文路径时出现问题?

    我的jsp位于以下位置 http myApp com myWebApp customer images customer jsp 我的图像 通过 customer jsp 访问 位于 http myApp com myWebApp imag
  • 使用滚动时间间隔来计算 R 和 dplyr 中的行数

    假设我有一个时间戳数据帧 其中包含当时售出的相应门票数量 Timestamp ticket count time int 1 2016 01 01 05 30 00 1 2 2016 01 01 05 32 00 1 3 2016 01 0
  • 通过 Julia 中的递归调用减少 JIT 时间

    我有一个递归函数 它操作整数二叉树 实现为一对嵌套的对或整数 我的函数创建一棵具有不同结构的新树 并递归调用自身直到满足某些条件 我发现的问题是 第一次运行代码时 需要花费很长时间来 JIT 编译该函数的所有可能的签名 之后运行良好 这是最
  • 图片链接块占据页面的整个宽度

    我有一个主要div作为宽度为 90 的容器 在顶部里面 我有一个标题 图片 height 5em display block and margin auto 我的 HTML 代码设置如下 a href img a 当我单击图片左侧时 我仍然
  • 对除输入之外的所有内容禁用选择[type=text]

    我需要禁用网页上除 input type text 元素之外的所有内容的选择 这个接受的答案 https stackoverflow com a 7109491 13087类似的问题几乎可以解决问题 但它不会禁用包含 input type
  • 将php日期转换为适合mysql数据库

    我想将输入日期转换为以下形式dd mm yyyyMySQL 格式为yyyy mm dd 我试图使用date Y m d strtotime POST date 但问题是输出总是Y d m 我认为因为它认为我的第二个论点是mm dd yyyy
  • 在Kafka中如何根据生产时间获得准确的偏移量

    我需要每天每小时获取 Kafka 生成的消息 每隔一小时我就会启动一个作业来消费 1 小时前生成的消息 例如 如果当前时间是 20 12 我将在 19 00 00 到 19 59 59 之间消费该消息 这意味着我需要在时间 19 00 00