如何在KafkaStream应用程序中获取partitionId和TopicName

2024-04-11

我们如何从 KafkaStream 获取主题名称和分区 id。对于任何其他 Kafka 消费者,我们可以获得主题名称和分区 ID,如下所示:

    ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",record.key(), record.value(), record.partition(), record.offset());}

不知道如何获取 KafkaStreams 中的记录引用。


您可以通过以下方式获取输入记录的元数据ProcessorContext在处理器 API 中公开。您可以通过以下方式将处理器 API 嵌入到 DSL 中transform()和类似的方法。

查看文档了解详细信息:https://docs.confluence.io/current/streams/developer-guide/processor-api.html#accessing-processor-context https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在KafkaStream应用程序中获取partitionId和TopicName 的相关文章

  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • 如何使用不同的kafka主题配置Kubernetes部署的微服务的每个pod/进程?

    在我们的应用程序中 有多个不同 kafka 主题的消费者 例如 Cosumer C1 Cosumer C2 Cosumer C3 Cosumer C4 Cosumer C5 以及不同的 kafka 主题 例如主题 1 主题 2 主题 3 主
  • 使用kafka lib反序列化PRIMITIVE AVRO KEY

    我目前无能力反序列化 avro PRIMITIVE 密钥在 KSTREAM 应用程序中 使用 avro 模式编码的密钥 在模式注册表中注册 当我使用 kafka avro console consumer 时 我可以看到密钥已正确反序列化
  • 如何连接Kafka和Elasticsearch?

    我是Kafka的新手 我使用kafka通过logstash收集netflow 可以 并且我想将数据从kafka发送到elasticsearch 但是存在一些问题 我的问题是如何将 Kafka 与 Elasticsearch 连接起来 net
  • 带有 Kafka 消费者的 Spring Boot 作业调度程序

    我正在开发一个 POC 我想使用来自 Kafka 主题 用户 的消息 尝试实现消费者应该从 Kafka 主题读取消息 一旦 spring boot 调度程序在预定时间或 cron 时间触发 那么我们应该开始从 kafka 主题中一一消费现有
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 如何处理Kafka流中的不同时区?

    因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
  • KafkaStreams 同一应用程序中的多个流

    我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate

随机推荐

  • 需要 bash 脚本和 awk 命令的帮助

    我需要一个 bash 脚本来递归打印文件夹名称和文件名 在 stdo p 中 例如 我有一个类似 earth plants flowers rose rose jpg 的文件夹结构 earth plant fruits apple appl
  • 将鼠标悬停在 XAML wpf 上时将按钮默认突出显示颜色更改为透明?

    我有一个带有透明背景的按钮 当我将鼠标移到按钮上时 按钮上明显会出现浅蓝色 默认颜色 我想要的是 即使鼠标位于按钮上方 我的按钮背景也应保持透明 我怎样才能在 XAML 中做到这一点 我已经搜索过但找不到与我的问题相关的任何内容 几乎每个示
  • 根据传递给控制器​​的参数动态创建查询

    在我的任务管理应用程序中 用户应该能够根据以下条件过滤任务 assignedTo priority status and or dueDate 我不确定如何创建动态查询 因为它将根据可用参数构建查询 例如 如果我有一个网址 例如 task
  • 如何将时间值的 NSString 表示形式转换为包含小时和分钟的两个 NSInteger?

    我正在深入研究 iOS 开发和 Objective C 语言 并正在构建一个闹钟应用程序以熟悉 SDK 和语言 我有一个NSString代表时间的对象 其范围 1 00 am to 12 59 am 我需要转换这个NSString一分为二N
  • data-reactroot 与 React 中的 Hydro 函数相关吗?

    我试图理解两者之间有什么区别ReactDOMServer renderToString and ReactDOMServer renderToStaticMarkup 在 React 16 8 6 上 这是我的理解 renderToStat
  • nosql 是什么意思?有人可以用简单的话向我解释一下吗?

    在这篇文章中堆栈溢出架构 http highscalability com stack overflow architecture我读到了一些叫做 nosql 的东西 我不明白它的意思 我试图在谷歌上搜索 但接缝我无法确切地了解它是什么 谁
  • 拖动手势活动时未引发 JavaFX KeyEvent

    所以我的问题是这样的 我正在实现一个 UI 创建工具 需要使用边缘上的拖动手势来调整元素的大小 当在检查是否按下修饰键时执行此拖动 即实现统一缩放 时会出现问题 当拖动手势处于活动状态时 永远不会引发键事件 因此我无法在拖动期间激活 停用此
  • Python 内置函数“compile”。它是干什么用的?

    我遇到了一个内置函数compile http docs python org 2 7 library functions html compile今天 虽然我阅读了文档 但仍然不明白它的用法或适用的地方 请任何人都可以举例说明此功能的使用
  • Python + Twisted + sqlanydb = abort()

    我通过官方 sqlanydb 驱动程序将 Twisted 11 与 SQLAnywhere 12 一起使用 一般来说 它工作得很好 但有时应用程序会因第一个查询中止而崩溃 如果一个查询有效 那么接下来的所有查询也都有效 然而我的测试很少通过
  • 如何检查数组是否多次具有值

    我想看看一个数组是否多次具有相同的值 例如 array array val1 val2 val3 val1 如您所见 在上面的数组中 有 2 x val1 要搜索数组是否包含值 我可以使用 in array 来完成 search in ar
  • PHPMyAdmin / MySql - 添加 ID 字段并自动填充 ID 号

    我有一个非常大的数据库表 近 2000 万条记录 这些记录没有唯一的 ID 号 所以 我插入了新字段 现在 我想用 ID 号填充它 从第一个 ID 号 10 000 001 开始增加 1 仅供参考 我在本地计算机上使用 WAMP 并且我已将
  • 在 keyup 事件上跳过 Primefaces 输入文本的验证,但在提交时验证

    我对输入文本有两个要求 p inputText 的值应立即通过 keyup event 显示在屏幕上的 h outputText 中 该值在数据库中应该是唯一的 我正在使用 Primefaces 4 0 JSF 2 2 以及 Glassfi
  • 将 DBF 文件导入 Sql Server

    我需要一些帮助来解决这个问题 因为我是存储过程的新手 我正在尝试使用此存储过程将 DBF 表导入到 Sql Server 2008 中 CREATE PROCEDURE spImportDB Add the parameters for t
  • 使用异常映射器的 JAX-RS

    我读到我可以创建一个实现javax ws rs ext ExceptionMapper它将把抛出的应用程序异常映射到Response目的 我创建了一个简单的示例 如果在保留对象时电话长度大于 20 个字符 该示例将引发异常 我期望异常映射到
  • 如何调整flexdashboard中的表格高度?

    我有一个 Flexdasboard 其中一个页面包含 1 个绘图 然后在其下面有一个表格 该表当前已被压缩 因此虽然它显示 25 行 但它们都在滚动选项中 因此您一次只能查看其中 2 行 我怎样才能改变这个 我目前正在使用以下代码进行编码
  • FluidPage 中的框,基本闪亮

    是否可以在经典闪亮应用程序中使用 box 元素 作为经典应用程序 我的意思是不是闪亮的仪表板 是的 这是可能的 你可以使用使用Shinydashboard https www rdocumentation org packages shin
  • 如何设置rdlc或ssrs报告中表格的最小行数?

    假设我想在表中设置最小行 当我的数据在表中未满时 只需插入空白行即可完全填充它 这个问题看起来很简单 但找到解决方案确实很难 所以我需要在这里分享一下 1 在表格中设置标题和1个绑定明细行 2 在表中插入空白行 只要您想要填充该行 组外 3
  • TCP 套接字到 Websocket?

    那里有很多 websocket gt 套接字包装器 比如网络套接字 https github com kanaka websockify 但是有相反的可用吗 具体来说 我希望能够使用应用程序连接到 TCP 套接字 并将代理转换为 webso
  • 在其他类构造函数中使用参数化构造函数

    我担心这是一个非常基本的问题 但是我还无法解决它 我有一个class A classA h class ClassA public ClassA ClassA int foo private int foo classA cpp Class
  • 如何在KafkaStream应用程序中获取partitionId和TopicName

    我们如何从 KafkaStream 获取主题名称和分区 id 对于任何其他 Kafka 消费者 我们可以获得主题名称和分区 ID 如下所示 ConsumerRecords