如何从 Kafka 主题获取最近的消息

2024-01-20

我们是否有任何选项,例如从 Kafka 主题获取最近 10/20 等消息。我可以看到 --from-beginning 选项从主题中获取所有消息,但如果我只想获取第一个、最后一个、中间或最新的几条消息 10. 我们有一些选择吗?


前 N 条消息

您可以使用--max-messages N为了获取第一个N某个主题的消息。

例如,要获取前 10 条消息,请运行

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  --max-messages 10

下 N 条消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --max-messages 10

最后 N 条消息

要获取最后 N 条消息,您需要定义特定的分区和偏移量:

bin/kafka-simple-consumer-shell.sh --bootstrap-server localhost:9092 --topic test--partition testPartition --offset yourOffset

M 到 N 条消息

同样,对于这种情况,您必须定义分区和偏移量。 例如,您可以运行以下命令来获取从您选择的偏移量开始的 N 条消息:

bin/kafka-simple-consumer-shell.sh --bootstrap-server localhost:9092 --topic test--partition testPartition --offset yourOffset --max-messages 10

如果您不想坚持使用二进制文件,我建议您使用kt https://github.com/fgeller/kt这是一个具有更多选项和功能的 Kafka 命令行工具。


更详细的内容可以参考文章如何在 Apache Kafka 中获取特定消息 https://betterprogramming.pub/how-to-fetch-specific-messages-in-apache-kafka-4133dad0b4b8

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Kafka 主题获取最近的消息 的相关文章

  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 使用Spring Cloud Stream Kafka动态更改instanceindex

    如同 在运行时更改 spring cloud stream 实例索引 计数 https stackoverflow com questions 37579939 changing spring cloud stream instance i
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • 无法找到任何实现 Connector 且名称与 io.debezium.connector.mysql.MySqlConnector 匹配的类,可用的连接器有

    使用 Kafka MySQL 和 Debezium 设置数据流管道 我是这个版本的 Kafka 3 4 0 MySQL 8 Debezium 2 2 1 Java 11 目标 我想从 MySQL 捕获所有 CDC 并将数据流式传输到 Kaf
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • TopologyTestDriver 在 KTable 聚合上发送错误消息

    我有一个聚合在 KTable 上的拓扑 这是我创建的通用方法 用于根据我拥有的不同主题构建此拓扑 public static
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk

随机推荐

  • GitHub SSH 密钥声称未使用

    为什么 在我的 GitHub 帐户上的 设置 gt SSH 密钥 下 它显示 由 GitHub for Mac 于 2014 年 10 月 24 日添加 从未使用过 没用过 我用过 我的个人资料中显示了很多贡献 从that机器 我还有另一把
  • Android:使用 onTouchListener() 循环执行线程

    您好 我的应用程序中有 8 个按钮 每个按钮都配置为 onclickListener 当单击该按钮时 字符串将写入套接字 现在我希望当我按住按钮时 字符串必须循环写入 这就是我正在尝试做的事情 bLeft setOnTouchListene
  • 在 Ubuntu 上安装 Java 7

    Note 这个问题是在 Oracle 将 OpenJDK 作为 Oracle JDK 的免费版本之前提出的 历史答案反映了这一点 从 2022 年起 您不应使用 Java 7 除非您必须使用无法在 OpenJDK 8 上运行的项目 为了安装
  • ELK 未将元数据从 filebeat 传递到 Logstash

    通过以下方式安装 ELK 服务器 https www digitalocean com community tutorials how to install elasticsearch logstash and kibana elk sta
  • R 编程:从数据框中查找所有因子

    我正在尝试获取数据框列的类类型 我正在做的是 sapply mydata class 但现在 我只想找到那些作为因素的列名 我尝试了以下方法 sapply data is factor 但它给了我 ResponseFlag Gender M
  • ANTLR 隐式乘法

    我是 ANTLR 的新手 我正在尝试扩展所提供的简单计算器的示例here https stackoverflow com a 1932664 具体来说 我尝试添加一些简单的函数 负数等 以熟悉 ANTLR 然而 我在尝试实现 隐式 乘法时遇
  • 如何收集与输入函数匹配通配符的Snakemake输入文件?

    我有一组使用 BWA MEM 生成并使用 GATK IndelRealigner 等进一步处理的 BAM 文件 我正在以较小的块对 BAM 文件进行预处理 以加快处理速度 然而 我必须在变体调用之前将这些单独的文件合并到一个 BAM 文件中
  • 为什么我不能从互斥锁中可变地借用单独的字段? [复制]

    这个问题在这里已经有答案了 尝试通过以下方式获取对单独字段的可变引用MutexGuard struct MyObject pub a i32 pub b i32 fn func 1 mtx Mutex
  • x86_64 执行 Shellcode 失败:

    我在 64 位 Linux 上使用 Python 2 7 我有以下 Python 脚本 应该执行一个简单的 Hello World shellcode import urllib2 import ctypes shellcode xb8 x
  • Dynamic_cast 不适用于非多态类型的原因

    有课B和派生类D class B int b class D public B int d D d new D B b dynamic cast
  • python 组合数据框中的行并将值相加

    我有一个数据框 Type Volume Q 10 Q 20 T 10 Q 10 T 20 T 20 Q 10 我想将类型 T 合并到一行中 并且仅当两个 或更多 T 连续时才添加音量 即 Q 10 Q 20 T 10 Q 10 T 20 2
  • 我如何近似“你的意思是?”不使用谷歌?

    我知道这个问题重复 谷歌 你是说吗 是怎么回事 算法工作 https stackoverflow com questions 307291 how does the google did you mean algorithm work 如何
  • 错误“virtualenv:找不到命令”,但安装位置位于 PYTHONPATH 中

    在过去的两天里 这让我发疯 我在 Macbook 上安装了 virtualenvpip install virtualenv 但是当我尝试使用创建一个新的 virtualenv 时virtualenv venv 我收到错误消息 virtua
  • 如何使用 Java/Swing 旋转图像,然后将其原点设置为 0,0?

    我能够旋转已添加到 JLabel 的图像 唯一的问题是 如果高度和宽度不相等 旋转后的图像将不再出现在 JLabel 的原点 0 0 处 这就是我正在做的事情 我还尝试使用 AffineTransform 并旋转图像本身 但结果相同 Gra
  • 在 WPF DataGrid 中使用 Enter 键作为 Tab

    我有一个DataGrid in WPF I want to move to the NextCell when i hit Enter and when the LastColumn is reached it should have th
  • Android Studio - 恐慌:无法打开 AVD

    经过几个小时修复 Gradle 问题后 我能够在 Android Studio 中构建我的测试应用程序 但是当我尝试在 AVD 中运行它时 它就是打不开 这是日志 Waiting for device C Users Rahaman App
  • 如何在不使用 len 的情况下知道列表是否仅包含 1 个元素

    我想知道列表是否只包含一个元素 而不使用len 在这两种解决方案之间 最Pythonic的方法是什么 或者也许这些都不是Pythonic的 如果是的话那又是什么 解决方案a 删除位置1处的项目 除了IndexError所以我知道只有 1 件
  • Python 求解一个变量的方程

    我正在尝试使用 SymPy 求解 python 中的方程 我有一个生成的方程 类似于function y 8 0 y 3 0 我将其与 SymPy 一起使用来创建一个如下所示的新方程 eq sympy Eq function 2 哪个输出y
  • 如何反序列化动态Json对象?

    我目前从我的 api 收到以下 JSON 响应 Lastname ERRLASTNAMEEMPTY Firstname ERRFIRSTNAMEEMPTY 请注意 上述响应是动态的 即有时我可以有名字 有时可以有姓氏 有时两者都有 此响应基
  • 如何从 Kafka 主题获取最近的消息

    我们是否有任何选项 例如从 Kafka 主题获取最近 10 20 等消息 我可以看到 from beginning 选项从主题中获取所有消息 但如果我只想获取第一个 最后一个 中间或最新的几条消息 10 我们有一些选择吗 前 N 条消息 您