Kafka如何获取topic最近n条消息

2023-11-09

问题来源

项目运行中我们经常需要诊断个个环节是否正确,其中到kafka就需要查看最新的消息到达kafka没有,达到的内容是什么,这就需要查看kafka指定topic的最近的n条消息(将kakfa消息全部打印出来非常耗时而且不必要)。当然我们可以使用第三方提供的kafka工具查看,可是使用这些工具耗时费力而且不能很好集成到项目中。

备注:第三方工具包括kakfa命令行工具一起其他第三方的工具。
注意事项:本博客所有代码是为了介绍相关内容而编写或者引用的,示例代码并非可直接用于生产的代码。仅供参考而已。

大体思路

我们知道如果使用KafkaConsumer类,要么从topic最老的消息开始,要么从最新的消息开始。下面是官方文档中关于auto.offset.reset 的解释。

The default is “latest,” which means that lacking a valid offset,
the consumer will start reading from the newest records (records that were written after the consumer started running).
The alternative is “earliest,” which means that lacking a valid offset, the consumer will read all the data in the partition,
starting from the very beginning.

因此直接使用KafkaConsumer就行不通了。 但是我们可以先通过给指定coisumerGroup设置offset,然后在让KafkaConsumer以该coisumerGroup读取数据。这样就能从指定offset开始读取消息了。

具体代码

完整的代码在这里,欢迎加星和fork。 谢谢!

package com.yq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 本例子试着读取当前topic每个分区内最新的30条消息(如果topic额分区内有没有30条,就获取实际消息)
 * className: ReceiveLatestMessageMain
 *
 * @author EricYang
 * @version 2019/01/10 11:30
 */
@Slf4j
public class ReceiveLatestMessageMain {
    private static final int COUNT = 30;
    public static void main(String... args) throws Exception {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "yq-consumer12");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");


        System.out.println("create KafkaConsumer");
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        AdminClient adminClient = AdminClient.create(props);
        String topic = "topic01";
        adminClient.describeTopics(Arrays.asList(topic));
        try {
            DescribeTopicsResult topicResult = adminClient.describeTopics(Arrays.asList(topic));
            Map<String, KafkaFuture<TopicDescription>> descMap = topicResult.values();
            Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> itr = descMap.entrySet().iterator();
            while(itr.hasNext()) {
                Map.Entry<String, KafkaFuture<TopicDescription>> entry = itr.next();
                System.out.println("key: " + entry.getKey());
                List<TopicPartitionInfo> topicPartitionInfoList = entry.getValue().get().partitions();
                topicPartitionInfoList.forEach((e) -> {
                    int partitionId = e.partition();
                    Node node  = e.leader();
                    TopicPartition topicPartition = new TopicPartition(topic, partitionId);
                    Map<TopicPartition, Long> mapBeginning = consumer.beginningOffsets(Arrays.asList(topicPartition));
                    Iterator<Map.Entry<TopicPartition, Long>> itr2 = mapBeginning.entrySet().iterator();
                    long beginOffset = 0;
                    //mapBeginning只有一个元素,因为Arrays.asList(topicPartition)只有一个topicPartition
                    while(itr2.hasNext()) {
                        Map.Entry<TopicPartition, Long> tmpEntry = itr2.next();
                        beginOffset =  tmpEntry.getValue();
                    }
                    Map<TopicPartition, Long> mapEnd = consumer.endOffsets(Arrays.asList(topicPartition));
                    Iterator<Map.Entry<TopicPartition, Long>> itr3 = mapEnd.entrySet().iterator();
                    long lastOffset = 0;
                    while(itr3.hasNext()) {
                        Map.Entry<TopicPartition, Long> tmpEntry2 = itr3.next();
                        lastOffset = tmpEntry2.getValue();
                    }
                    long expectedOffSet = lastOffset - COUNT;
                    expectedOffSet = expectedOffSet > 0? expectedOffSet : 1;
                    System.out.println("Leader of partitionId: " + partitionId + "  is " + node + ".  expectedOffSet:"+ expectedOffSet
                    + ",  beginOffset:" + beginOffset + ", lastOffset:" + lastOffset);
                    consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(expectedOffSet -1 )));
                });
            }

            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("read offset =%d, key=%s , value= %s, partition=%s\n",
                            record.offset(), record.key(), record.value(), record.partition());
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("when calling kafka output error." + ex.getMessage());
        } finally {
            adminClient.close();
            consumer.close();
        }
    }
}

运行效果

效果图解释:
我们只读取topic01每个分区最近的30条消息,如果该partition没有30条,那就读取全部的。 可以看到partition 0有39条消息,因此从第8条开始读取。

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka如何获取topic最近n条消息 的相关文章

  • C++:偏移到 std::vector 迭代器的正确转换是什么?

    我有一个函数需要std vector双精度数 并将它们复制到另一个向量 但以特定的偏移量 假设有足够的空间 void copy stuff const std vector
  • Excel图表动态范围选择

    我有一个客户对 Excel 工作表设置有一个简单但复杂的请求 但我不知道从哪里开始 我正在画一个空白 我们有一个数据范围 例子 Quarter Data 2010Q1 1 2010Q2 3 2010Q3 4 2010Q4 1 我在此基础上建
  • SQL Server LAG() 函数计算行之间的差异

    我是 SQL Server 新手 对 lag 函数有一些疑问 我必须计算两个用户活动之间的平均距离 以天为单位 然后 我必须对所有用户进行分组 计算每个用户的行之间的所有日期差异 最后选择该组的平均值 Just to be clear I
  • 如何在编译时确定元组元素的偏移量?

    我需要确定元组中某个索引元素的偏移量compile time 我尝试了这个功能 复制自https stackoverflow com a 55071840 225186 接近尾声 template
  • VBA/Excel 中行和列范围偏移的最大值是多少?

    我正在使用 microsoft excel 2003 执行以下 If 语句时收到 应用程序定义或对象定义错误 如果 Range MyData CurrentRegion Offset i 0 Resize 1 1 Value Range M
  • Kafka基础—3、Kafka 消费者API

    一 Kafka消费者API 1 消息消费 当我们谈论 Kafka 消费者 API 中的消息消费时 我们指的是消费者如何从 Kafka 主题中拉取消息 并对这些消息进行处理的过程 消费者是 Kafka 中的消息接收端 它从指定的主题中获取消息
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 我有一个 has_many 关系,我想设置自定义限制和偏移量。以及计算它们

    Hy My code profile images 我只想一次只获取 10 张图像 偏移量为 10 就像这样 profile images limit gt 10 offset gt 10 不是这样的 has many images lim
  • jQuery Offset 返回负值

    我有一个像下面这样的场景 在我的用户界面中 我将有一个文本框 如果我在文本框中输入了数字 我需要向下滚动到相应的页码 In Dom 我将有一些带有各自 id 的 div 如果用户输入页码为 5 我将检查 dom 中的第 5 个 div 偏移
  • 卡夫卡消费者偏移最大值?

    我正在谷歌搜索并阅读 Kafka 文档 但我无法找到消费者偏移量的最大值以及最大值之后是否存在偏移量环绕 我知道 offset 是一个 Int64 值 所以最大值是 0xFFFFFFFFFFFFFFFF 如果存在wraparound Kaf
  • 未定义的偏移量:1

    在我当前的 PHP 脚本中出现此错误 未定义的偏移量 1 我的代码在这里 query SELECT item id username item content FROM updates ORDER BY update time DESC L
  • 如何在编译时计算类成员的偏移量?

    给定 C 中的类定义 class A public methods definition private int i char str 是否可以使用 C 模板元编程在编译时计算类成员的偏移量 该类不是 POD 并且可以具有虚拟方法 基元和对
  • 滞后函数获取最后一个不同的值(redshift)

    我有如下示例数据 想要获得所需的 O P 请帮我一些想法 我希望第 3 4 行的 prev diff value 的 o p 为2015 01 01 00 00 00代替2015 01 02 00 00 00 with dat as sel
  • Spark Streaming kafka 偏移量管理

    我一直在做 Spark Streaming 工作 通过 kafka 消费和生成数据 我使用的是directDstream 所以我必须自己管理偏移量 我们采用redis来写入和读取偏移量 现在有一个问题 当我启动我的客户端时 我的客户端需要从
  • 第一个元素偏移量

    结构的第一个元素的偏移量为 0 是否是保证 为了更准确 让我们考虑一下 struct foo int a double b struct foo ptr malloc sizeof struct foo int int ptr ptr gt
  • 获取隐藏元素的偏移量

    如何获取隐藏元素的坐标 offset 不支持使用隐藏元素 有什么提示吗 如果你的元素有 hide 调用它 或者如果它有display none在CSS中 浏览器根本不费心去渲染它 在这种情况下 答案不是直接的 在最近的jQueries中 你
  • offsetParent 与parentElement 或parentNode 之间的区别

    我有以下 DOM 结构 div table table div
  • Android Studio 性能低下

    我昨天将 Android Studio 更新到了 2 3 版本 自从经历了性能缓慢之后 它之前工作得很好 虽然在 gradle 构建时很慢 但现在它占用了 100 的处理器资源 Android Studio 流畅运行需要什么条件 我的电脑运
  • AS2 使用 onEnterFrames 处理多个影片剪辑时减少延迟的最佳方法

    正如标题所述 我想知道同时处理舞台上多个影片剪辑的最佳方法是什么 每个影片剪辑都有自己的 onEnterFrame 函数 假设屏幕上同时有 50 个敌人 并且不断播放行走动画 onEnterFrame 函数将包括获取方向 移动 X Y 值
  • 按列分组的数据帧上 R 中的行之间的差异

    我希望通过 app name 获得不同版本的计数差异 我的数据集如下所示 app name version id count difference 这是数据集 data structure list app name structure c

随机推荐

  • urldecode 报错 Malformed UTF-8 characters, possibly incorrectly encoded

    使用urlencode 编码了一段字符串写入数据库 读取的时候使用urldecode 解码报错 Malformed UTF 8 characters possibly incorrectly encoded 解决方案 检查一下是否保存到数据
  • ajax不弹出新页面问题

    ajax默认是异步请求 做局部刷新的 指的是当前页数据渲染的 如果后台是转发或者重定向了 如果用ajax的话是不会弹出新的页面的 from提交的话 如果后台是转发或者重定向了 是可以打开新的页面的
  • 【人脸识别】【python】Object arrays cannot be loaded when allow_pickle=False解决方案

    2020年2月11日 0次阅读 共1625个字 0条评论 0人点赞 QueenDekimZ mtcnn debug 用mtcnn对LFW人脸数据集进行人脸检测与关键点对齐 并裁剪到160 160维 为后续facenet训练作training
  • wx.login wx.getUserProfile 获取登录凭证

    wx login 调用接口获取登录凭证 code 通过凭证进而换取用户登录态信息 包括用户在当前小程序的唯一标识 openid 微信开放平台帐号下的唯一标识 unionid 若当前小程序已绑定到微信开放平台帐号 及本次登录的会话密钥 ses
  • 通过hexo快速搭建个人博客

    个人博客预览点击这里 菜卷的博客 快速搭建一个博客 一 需要安装的工具 二 开始安装Hexo 三 安装完成后 初始化项目 四 在项目根目录下执行命令 五 启动项目 六 部署到github 七 配置文件 八 安装next主题 九 优化next
  • C语言程序实训--实验设备管理系统

    之前学校c语言程序实训课要求写的 如果程序有错误或可以改进的地方 希望各位指出 开发环境 IDE Visual Studio Code Dev C 处理器 AMD Ryzen 7 PRO 6850HS with Radeon Graphic
  • 73家!华为鸿蒙OS合作伙伴汇总

    6月2日 华为发布了最新版的鸿蒙操作系统 HarmonyOS 2 0 以及一系列搭载鸿蒙的硬件产品 比如手机 手表 平板 耳机 显示器等等 如今的智能终端越来越多 厂商不可能为每个设备单独准备一个系统 因为这不仅让开发者工作量倍增 消费者用
  • Flask网站中使用Keras时报错“Tensor Tensor(*) is not an element of this graph”

    HyperLPR车牌识别程序本地中能进行正常识别 但将其放到flask搭建的网站中进行识别 不能运行 并报错 Tensor Tensor is not an element of this graph HyperLPR中的识别模型采用的是K
  • Mask掩码

    Python中Mask的用法 引例 Numpy的MaskedArray模块 小于 或小于等于 给定数值 大于 或大于等于 给定数值 在给定范围内 超出给定范围 在算术运算期间忽略NaN和 或infinite值 All men are scu
  • Count Color

    http poj org problem id 2777 Description Chosen Problem Solving and Program design as an optional course you are require
  • 【QT】——布局

    目录 1 在UI窗口中布局 2 API设置布局 2 1 QLayout 2 2 QHBoxLayout 2 3 QVBoxLayout 2 4 QGirdLayout 注意 示例 Qt 窗口布局是指将多个子窗口按照某种排列方式将其全部展示到
  • Apifox—诠释国产接口管理工具新高度

    揭开Apifox的神秘面纱 曾经在对于接口管理和调试工作上 大量的开发者往往会选择使用Swagger做接口文档管理 用Postman做接口调试工具 然而这样使用的痛处其实也不言而喻 原本同一类型的工作却被放置在不同的软件工具上 并且对于接口
  • 图像二值化方法--OTSU(最大类间方差法)

    前面学习了直方图双峰法 图像二值化方法中的阈值法 最大类间方差法 OTSU 是找到自适应阈值的常用方法 原理参考了冈萨雷斯的 数字图像处理 以下是自己写的函数 获取灰度图in的OTSU阈值 int Segment otsuMat Mat i
  • [译] Scratch 平台的神经网络实现(R 语言)

    原文地址 Neural Networks from Scratch in R 原文作者 Ilia Karmanov 译文出自 掘金翻译计划 本文永久链接 github com xitu gold m 译者 CACppuccino 校对者 I
  • 【通信协议】笔记之Redis协议抓取分析

    RESP Redis序列化协议 概念 Redis底层使用的通信协议是RESP Redis Serialization Protocol的缩写 RESP协议可以序列化多种类型 比如Simple Strings 简单字符串 Errors 错误类
  • FreeRTOS记录(九、一个裸机工程转FreeRTOS的实例)

    记录一下一个实际项目由裸机程序改成FreeRTOS 以前产品的平台还是C8051单片机上面的程序 硬件平台改成了STM32L051 同时使用STM32CubeMX生成的工程 使用FreeRTOS系统 EEPROM数据存储读取函数修改更新 2
  • 数学建模第二天:数学建模工具课之MATLAB绘图操作

    目录 一 前言 二 二维绘图 1 曲线图 散点图plot 2 隐函数 显函数与参数方程的绘图 ezplot fplot 三 三维绘图 1 单曲线plot3 2 多曲线plot3 3 曲面 实曲面surf 网格曲面mesh 四 特殊的二维 三
  • 9.Linux虚拟机下Hive的安装配置

    hadoop 3 1 3 jdk 8u162 linux x64 apache hive 3 1 2 bin 本案例软件包 链接 https pan baidu com s 1ighxbTNAWqobGpsX0qkD8w 提取码 lkjh
  • 基于Python机器学习算法小分子药性预测(岭回归+随机森林回归+极端森林回归+加权平均融合模型)

    目录 前言 总体设计 系统整体结构图 系统流程图 运行环境 Python 环境 配置工具包 模块实现 1 数据预处理 2 创建模型并编译 3 模型训练 系统测试 工程源代码下载 其它资料下载 前言 麻省理工科技评论 于2020年发布了 十大
  • Kafka如何获取topic最近n条消息

    问题来源 项目运行中我们经常需要诊断个个环节是否正确 其中到kafka就需要查看最新的消息到达kafka没有 达到的内容是什么 这就需要查看kafka指定topic的最近的n条消息 将kakfa消息全部打印出来非常耗时而且不必要 当然我们可