是否可以使用 Kafka Streams 访问消息头?

2024-05-07

随着添加Headers http://apache.spinellicreations.com/kafka/0.11.0.0/javadoc/org/apache/kafka/common/header/Header.html到记录(生产者记录 https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html & 消费者记录 http://mirror.reverse.net/pub/apache/kafka/0.11.0.0/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)在 Kafka 0.11 中,使用 Kafka Streams 处理主题时是否可以获取这些标头?当调用类似方法时map on a KStream它提供了以下论据keyvalue的记录,但我无法看到访问headers。如果我们能的话那就太好了map超过ConsumerRecords.

ex.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...

从 2.0.0 版本开始可以访问记录标题(参见KIP-244 https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API了解详情)。

您可以通过处理器 API 访问记录元数据(即通过transform(), transformValues(), or process()),通过给定的“上下文”对象(参见https://docs.confluence.io/current/streams/developer-guide/processor-api.html#accessing-processor-context https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

Update

从 2.7.0 版本开始,处理器 API 得到了改进(参见KIP-478 https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API),添加一个新的类型安全api.Processor与 一起上课process(Record)代替process(K, V)方法。对于这种情况,标头(和记录元数据)可以通过Record class).

这个新功能是尚不可用在“DSL 的 PAPI 方法”中(例如KStream#process(), KStream#transform()和兄弟姐妹)。

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳,但不公开在旧版本中读取时实际上被 Streams 删除的标头。

但元数据在 DSL 级别不可用。然而,通过以下方式扩展 DSL 的工作也在进行中KIP-159 https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以使用 Kafka Streams 访问消息头? 的相关文章

  • 如何在日期选择器中设置不在当前月份的单元格的样式

    我目前正在为我的 JavaFX 应用程序制作注册表 问题是 当日期选择器中的单元格不在页面的月份上时 我想让该单元格变灰 让我们看看我当前的日期选择器 我的日期选择器 正如您所看到的 我希望下个月的日期 27 日 28 日 30 日以及 1
  • 在 JTable 中移动行

    我使用 MVC 模式 并且有一个如下所示的 JTable List
  • 热重载在docker中运行的java程序

    我开发了一个java程序 应该在docker中运行 然而 我在调试docker中运行的java程序时遇到了很多痛苦 我在网上搜索 一些教程提出了像 spring dev tools 这样的工具 因为我的java程序是基于spring boo
  • 如何在 JFace 的 TableViewer 中创建复选框?

    我创建了一个包含两列的 tableViewer 我想将其中一列设为复选框 为此 我创建了一个 CheckBoxCellEditor 但我不知道为什么它不起作用 名为 tableName 的列显示其值正常 色谱柱规格如下 String COL
  • 如何在一行中将字符串数组转换为双精度数组

    我有一个字符串数组 String guaranteedOutput Arrays copyOf values values length String class 所有字符串值都是数字 数据应转换为Double QuestionJava 中
  • Java程序中的数组奇怪的行为[重复]

    这个问题在这里已经有答案了 我遇到了这个 Java 程序及其以意想不到的方式运行 以下程序计算 int 数组中元素对之间的差异 import java util public class SetTest public static void
  • 解决错误:日志已在具有多个实例的atomikos中使用

    我仅在使用atomikos的实时服务器上遇到问题 在我的本地服务器上它工作得很好 我在服务器上面临的问题是 init 中出错 日志已在使用中 完整的异常堆栈跟踪 java lang RuntimeException Log already
  • 如何查找 Android 设备中的所有文件并将它们放入列表中?

    我正在寻求帮助来列出 Android 外部存储设备中的所有文件 我想查找所有文件夹 包括主文件夹的子文件夹 有办法吗 我已经做了一个基本的工作 但我仍然没有得到想要的结果 这不起作用 这是我的代码 File files array file
  • 使用 ANTLR 为 java 源代码生成抽象语法树

    如何使用 ANTLR 从 java src 代码生成 AST 有什么帮助吗 好的 步骤如下 前往ANTLR站点 http www antlr org 并下载最新版本 下载Java g和JavaTreeParser g文件来自here htt
  • 如何在 Java 中禁用 System.out 以提高速度

    我正在用 Java 编写一个模拟重力的程序 其中有一堆日志语句 到 System out 我的程序运行速度非常慢 我认为日志记录可能是部分原因 有什么方法可以禁用 System out 以便我的程序在打印时不会变慢 或者我是否必须手动检查并
  • hibernate总是自己删除表中的所有数据

    您好 我正在开发一个 spring mvc 应用程序 它使用 hibernate 连接到存储文件的 mysql 数据库 我有两个方法 一个方法添加我选择的特定文件路径中的所有文件 另一种方法调用查询以返回从 mysql 存储的文件列表 问题
  • Microsoft Graph 身份验证 - 委派权限

    我可以使用 Microsoft Graph 访问资源无需用户即可访问 https developer microsoft com en us graph docs concepts auth v2 service 但是 此方法不允许我访问需
  • 检查 Android 手机上的方向

    如何查看Android手机是横屏还是竖屏 当前配置用于确定要检索的资源 可从资源中获取Configuration object getResources getConfiguration orientation 您可以通过查看其值来检查方向
  • 尝试使用 Ruby Java Bridge (RJB) gem 时出现错误“无法创建 Java VM”

    我正在尝试实现 Ruby Java Bridge RJB gem 来与 JVM 通信 以便我可以运行 Open NLP gem 我在 Windows 8 上安装并运行了 Java 所有迹象 至少我所知道的 都表明 Java 已安装并可运行
  • Java中未绑定通配符泛型的用途和要点是什么?

    我不明白未绑定通配符泛型有什么用 具有上限的绑定通配符泛型 stuff for Object item stuff System out println item Since PrintStream println 可以处理所有引用类型 通
  • Java - 不要用 bufferedwriter 覆盖

    我有一个程序可以将人员添加到数组列表中 我想做的是将这些人也添加到文本文件中 但程序会覆盖第一行 因此这些人会被删除 如何告诉编译器在下一个空闲行写入 import java io import java util import javax
  • 如何修复“sessionFactory”或“hibernateTemplate”是必需的问题

    我正在使用 Spring Boot JPA WEB 和 MYSQL 创建我的 Web 应用程序 它总是说 sessionFactory or hibernateTemplate是必需的 我该如何修复它 我已经尝试过的东西 删除了本地 Mav
  • java迭代器内部是如何工作的? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个员工列表 List
  • 中断连接套接字

    我有一个 GUI 其中包含要连接的服务器列表 如果用户单击服务器 则会连接到该服务器 如果用户单击第二个服务器 它将断开第一个服务器的连接并连接到第二个服务器 每个新连接都在一个新线程中运行 以便程序可以执行其他任务 但是 如果用户在第一个
  • javax.persistence.Table.indexes()[Ljavax/persistence/Index 中的 NoSuchMethodError

    我有一个 Play Framework 应用程序 并且我was使用 Hibernate 4 2 5 Final 通过 Maven 依赖项管理器检索 我决定升级到 Hibernate 4 3 0 Final 成功重新编译我的应用程序并运行它

随机推荐

  • AngularJS:在任何部分页面控制器之前调用特定函数

    我想调用一个特定的函数 GetSession 在我的应用程序加载开始时 这个函数使 http调用并获取会话令牌 GlobalSessionToken从服务器 然后 该会话令牌用于其他控制器逻辑并从服务器获取数据 我已经打电话给这个GetSe
  • 无法在 Google Cloud Function 中加载 node_modules(index.js,不在项目根目录上)

    因此 我需要部署 Google Cloud Function 以在 PostgreSQL 数据库 Cloud SQL 上发出选择请求 我需要使用 pg 模块 然后使用以下命令安装它 npm i pg 你需要知道我的项目目录是这样的 proj
  • 添加到 std::vector 的中间

    有没有办法将值添加到 a 的中间vector在 C 中 假设我有 vector
  • 在react中自定义useAxios钩子

    我正在使用 axios 和 React 所以我想为此编写一个自定义钩子 我这样做了 它工作正常 如下所示 const useAxios gt const response setResponse useState const error s
  • 弹性图表隐藏数据提示

    我们从多个源获取数据 并且某个日期的数据可能存在也可能不存在 因此 对于没有数据的点 我们发送 NaN 问题 在下面的代码中 有没有办法不显示那些为空的数据提示 我添加了一个数据提示功能 但它确实显示了一个小的空方块 是否有可能甚至不显示
  • VSCode 扩展的安全性和隐私性

    我发现 VSCode 有很多不错的扩展 然而 我担心这些扩展是否将我的代码发送到他们的任何服务器 有什么办法可以查到吗 我可以使用 fiddler 并隔离插件中可能发生的调用 但不想对我安装的每个扩展都这样做 VScode 团队对此有一些指
  • VS2015 nuget包管理器找不到包

    我安装了 VS2015 Update 2 现在 nuget 包管理器找不到 Microsoft 和 net 包源之外的任何包 看起来 nuget 包源已被删除 当我将其添加回 http www nuget org http www nuge
  • 外部“C”声明如何工作?

    我正在学习编程语言课程 我们正在讨论extern C 宣言 除了 它与 C 和 C 接口 之外 此声明如何在更深层次上工作 这对程序中发生的绑定有何影响 extern C 用于确保后面的符号不是mangled http en wikiped
  • 如何测量异步发电机所花费的时间?

    我想测量生成器花费的时间 阻塞主循环的时间 假设我有以下两个生成器 async def run for i in range 5 await asyncio sleep 0 2 yield i return async def walk f
  • 预注册 ATL 窗口类

    我在一个项目中使用了 ATL 和 WTL 的组合 并从中派生了我自己的类CWindowImpl 看起来像这样 class CMyControl public CWindowImpl
  • 浮点型、双精度型和十进制最大值与大小的关系[重复]

    这个问题在这里已经有答案了 我在 C 中遇到了这些数据类型的大小和最大值的令人困惑的模式 在使用 Marshal SizeOf 比较这些大小时 我发现了以下结果 Float 4 bytes Double 8 bytes Decimal 16
  • Symfony2 -> Twig -> 表单 -> 字段 -> 设置渲染 = true

    我有一个简单的问题 我有一个带有字段的表单 例如 builder gt add x gt add y gt add z 在我的树枝文件中 我使用了多个块 并且我想停止渲染字段 我查看了 b html twig 文件 a html twig
  • LibGDX dispose() 方法应该如何使用?

    我很不清楚如何dispose LibGDX 框架中的方法有效并且应该使用 据我所知 当你不需要某种资源后 你必须进行处置以确保你的程序运行最佳 我正在开发一个移动应用程序 并且我有一个AssetManager在启动时在特殊指定的加载屏幕中加
  • 消息:Hive 架构版本 1.2.0 与 Metastore 的架构版本 2.1.0 不匹配 Metastore 未升级或损坏

    环境 spark2 11 hive2 2 hadoop2 8 2 hive shell 运行成功 并且没有错误或警告 但是当运行application sh时 启动失败 usr local spark bin spark submit cl
  • Android 中如何获取帧

    实际上 我需要从视频中获取所有帧 但在使用 Mediametadataretriever 缩略图 时间戳获取帧时 我经常重复获取第一帧 然后获取特定时间帧 我通过更改所有 GetFrameAtTime options 尝试了很多修复 但仍然
  • 如何避免javascript中for循环内的for循环

    我已经编写了一段运行良好的代码 我想要一个新数组 其中包含 myArr 中的元素 按照 orderArr 中指定的顺序 但是 它在另一个 for 循环中使用 for 循环来匹配数组元素 var myArr a b c d e var ord
  • 从所有通讯组中删除所有前雇员

    因此 今天我被分配的任务是从所有 DL 中删除域中的所有前员工 他们在 AD 中拥有自己的文件夹 有没有什么方法可以快速做到这一点 或者至少比单独检查每个并转到 gt 的成员删除所有更快 Thanks 编辑以添加更多信息 有 822 个用户
  • API 错误 (500):清单未知:清单未知

    it failes to pull the image with SHA256 digest identifier 不幸的是 这是 DockerHub 消除 Docker 1 9 守护进程的向后兼容性的副作用 当使用 Docker 1 10
  • PHP 命名空间和 use 即使我已经使用 use 指定了类,也找不到致命错误类

    我在 PHP 中的名称空间方面遇到了麻烦 例如我有一个这样的文件 namespace App Models Abstracts abstract class Country 然后是另一个像这样的文件 namespace App Models
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录