Kafka Streams如何获取kafka headers

2023-11-22

我有下面的卡夫卡流代码

    public class KafkaStreamHandler implements  Processor<String, String>{

    private ProcessorContext context;


        @Override
    public void init(ProcessorContext context) {
        // TODO Auto-generated method stub
        this.context = context;
    }

    public KeyValue<String, KafkaStatusRecordWrapper> process(String key, String value) {

        Headers contexts = context.headers();

        contexts.forEach(header -> System.out.println(header));
     }

public void StartFailstreamHandler() {
       StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "failed-streams-userstream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "500");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //consumer_timeout_ms
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

        props.put("state.dir","/tmp/kafka/stat));

     userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

     /* take few descsion based on Header */
     /* How to get the Header */ 

       userStream.map(this::process);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);


kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {

                logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
            }
        });


        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

    }

现在我们的一个客户端正在发送有关 kafka 标头的版本信息,如下所示。

ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic", 1, "message");
record.headers().add(new RecordHeader("version", "v1".getBytes()));
producer.send(record);

基于此标头,我需要为我的消息选择解析器,如何使用 KStream 运算符读取此标头? 我已经看过流的所有 API,但没有方法给出 header

我无法更改为普通的 kafka 消费者,因为我的应用程序已经依赖于几个 KStream API ..


处理器不允许您在下游 DSL 中链接新运算符,您应该使用 TransformValues,以便可以继续使用 Stream DSL:

  1. 首先从 ValueTransformerWithKey 中提取 headers
public class ExtractHeaderThenDoSomethingTransformer implements ValueTransformerWithKey<String, String, String> {

    ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        Headers headers = context.headers();
        /* take few descsion based on Header: if you want to filter base on then just return null then chaining another filter operator after transformValues*/
        /* How to get the Header */
        return value;
    }

    @Override
    public void close() {

    }
}
  1. 将 ExtractHeaderThenDoSomethingTransformer 添加到您的拓扑中,如下所示:
userStream
        .transformValues(ExtractHeaderThenDoSomethingTransformer::new)
        .map(this::processs);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams如何获取kafka headers 的相关文章

  • 如何知道 HTTP 服务器何时完成发送数据

    我正在开发一个面向浏览器 代理的项目 我需要下载网页 向 Web 服务器发送自定义 HTTP 请求后 我开始监听服务器响应 读取响应时 我检查响应标头中的 Content Length row 如果我得到其中之一 很容易确定服务器何时完成发
  • 无法执行目标 org.codehaus.mojo:exec-maven-plugin:1.5.0:exec

    三周前 我完成了一个网络应用程序的工作 一切都运行没有问题 现在 三周后 没有任何更改 我想再次运行该应用程序 但这次我收到以下错误消息 An error occurred while parsing the server response
  • 为 JSP 创建注销链接?

    当用户登录我的应用程序时 他提交一个要通过 Servlet 处理的表单 servlet 为用户创建一个会话 我如何创建一个链接以便用户可以注销 我似乎无法直接链接到 Servlet 如何删除会话并链接回主页 HttpSession sess
  • SharePoint 2010 Web 服务上的 Java JBoss 401 错误

    我的代码在 Eclipse IDE 中测试时运行成功 我正在使用生成的 Copy wsdl 通过 Web 服务连接到 MS SharePoint 2010 当我在 JBoss 服务器上部署代码 运行 Adob e LifeCycle 时 我
  • 有没有一种干净的方法将泛型类型的类分配给变量?

    鉴于此代码 List
  • 从 Windows Batch (cmd.exe) 中的文件读取环境变量

    我正在尝试从批处理文件中读取变量 以便稍后在批处理脚本 Java 启动器 中使用 理想情况下 我希望所有平台 Unix Windows 上的设置文件都具有相同的格式 并且也是有效的 Java 属性文件 也就是说 它应该看起来像这样 sett
  • Netbeans 雷达插件配置

    我使用的是 Netbeans 8 0 1 在提交到 SVN 之前 我需要从 IDE 运行并检查 SonarQube 分析 我已经安装了 Netbeans Radar 插件 用于启动本地分析并检查结果 这个插件有一个名为 Get Issues
  • 获取文本文件中行的字节偏移量?

    我有一个文本文件 例如 one two three four five 我需要获取文件中每一行的偏移量 我如何在 Java 中做到这一点 我搜索了一些 I O 库 如 BufferedReader 和 RandomAccessFile 但我
  • 如何增加使用 SAX 解析 XML 文件的entityExpansionLimit

    我正在尝试使用 Java 中的 SAX 解析器解析一个 1 23 GB 的 XML 文件 我使用的是 Mac 操作系统和 JDK 1 7 0 51 不幸的是 我收到以下错误 The pasrser has encountered more
  • Runtime.getRuntime().exec(cmd) 挂起

    我正在执行一个命令 该命令返回文件的修订号 文件名 但如果执行命令时出现问题 应用程序就会挂起 我可以做什么来避免这种情况 请在下面找到我的代码 String cmd cmd C si viewhistory fields revision
  • 使用 PowerMock 和 TestNG 模拟单个静态方法

    class StaticClass public static String a return a public static String ab return a b 我想嘲笑StaticClass a以便它返回 x 并致电StaticC
  • 无法从 PDFA1-a 格式文档中提取图像

    我正在使用以下代码从 PDFA1 a 格式的 pdf 中提取图像 但我无法获取图像 List
  • 如何修复运行 Android 模拟器时出现 GPU Driver Issue 错误

    我的 Android 模拟器几周前运行良好 但现在出现错误 当我运行代码时 GPU 驱动程序问题错误对话框与模拟器一起弹出 当我单击 确定 时 Android 模拟器不会按预期运行应用程序 错误如下 Your GPU driver info
  • 具有多个字符串的列表视图

    我正在尝试创建一个包含多个字符串的列表视图 现在我有一个可以实现的功能 while i lt 10 GETS DATA FROM WEBPAGE ETC a DATAFROMWEBPAGE1 b DATAFROMWEBPAGE2 c DAT
  • toArray 与预先确定大小的数组

    使用时ar toArray new String ar size 安卓工作室3 2 1警告预先确定大小的数组并建议空数组 有两种方式将集合转换为数组 使用 预先确定大小的数组 如 c toArray new String c size 或使
  • 如何查找类路径中具有指定名称的所有资源?

    我想列出类路径中具有特定名称的所有文件 我预计会发生多次 因此Class getResource String 不管用 基本上 我必须识别类路径中任何位置具有特定名称 例如 xyz properties 的所有文件 然后累积读取其中的元数据
  • 枚举

    我试图拥有一组扩展通用接口的枚举 例如 interface Fooable void someCommonMethod enum E1 implements Fooable some enumuerations and a definiti
  • 如何将 printStackTrace() 中的异常写入 Java 中的文本文件?

    我需要用 Java 捕获文本文件中的异常 例如 try File f new File catch FileNotFoundException f f printStackTrace instead of printing into con
  • Java Media API:java media api 下载

    我在哪里可以找到javax media jar 文件 在sun站点它下载一个安装程序 有没有可用的java媒体jar 没有 javax media 具体是 jar 文件 该包位于 jmf jar 文件中 您需要运行安装程序并取出 jar 或
  • 项目级别的@PowerMockIgnore

    在 Maven 中运行时 我的 powermock 测试用例出现以下错误 java lang LinkageError loader constraint violation loader instance of org powermock

随机推荐

  • Node.js MySQL 模块 - 抛出错误; // 重新抛出非 MySQL 错误;

    今天我尝试了来自 w3schools 的 node js mysql 片段 var mysql require mysql var con mysql createConnection host localhost user roots W
  • 如何在 Coq 中使用归纳类型来处理案例

    我想使用destruct通过案例来证明陈述的策略 我在网上读了几个例子 但我很困惑 有人可以更好地解释一下吗 这是一个小例子 还有其他方法可以解决它 但尝试使用destruct Inductive three zero one two Le
  • Visual Studio C++ 是否可以在不链接的情况下编译对象

    我正在运行 VS 2010 SP1 并且有一个每周运行一次的特殊分析配置 因为构建服务器需要很长时间来分析所有内容 我希望此配置无需链接即可运行 如果分析通过了项目中的所有代码 那么我希望构建继续进行下一个项目而不链接 我看不出有什么方法可
  • Python套接字接受块-防止应用程序退出

    我编写了一个非常简单的 python 类 它等待套接字上的连接 目的是将此类粘贴到现有应用程序中 并将数据异步发送到连接的客户端 问题是 当等待 socket accept 时 我无法通过按 ctrl c 来结束我的应用程序 我也无法检测到
  • JDBC 到 Spark Dataframe - 如何确保均匀分区?

    我是 Spark 新手 正在致力于通过 JDBC 从 Postgres 数据库表创建 DataFrame 使用spark read jdbc 我对分区选项有点困惑 特别是分区列 下界 上限 and 分区数 文档似乎表明这些字段是可选的 如果
  • JSON - Spring MVC:如何将 json 数据发布到 spring MVC 控制器

    我在发布 JSON 数据时遇到问题jsp to controller 每次我尝试都会收到 ajax 错误Bad Request 我对 JSON 很陌生 我真的不知道我做错了什么 我搜索并尝试了一些可以在该网站中找到的示例 但仍然遇到问题 在
  • 使用 JAX-WS:如何设置用户代理属性

    我对此进行了搜索并发现了一些未遂事件 我创建了一个 java 客户端来使用 JAX WS 来使用 Web 服务 使用 JAX 时有没有办法设置 HTTP USER AGENT 值 我希望在特定客户端 我的 访问它时获得我的网络服务日志 因此
  • 检测何时连接新显示器

    我正在编写一个需要两个显示器的应用程序 一个用于控制面板 另一个用于输出 我所拥有的是这样的 如果只有一个显示器 应用程序会在其上显示两种表单 但如果有两个显示器 则输出表单将转到另一个 问题是这只在应用程序启动时才会发生 换句话说 如果应
  • 在jsf中使用json将数据从bean发送到javascript

    我想将我的数组列表从 ManagedBean 发送到 JavaScript 代码 我的豆子在这里 public void getDataAsJson String dizi Tokyo Jakarta New York Seoul Mani
  • 如何计算列的平均值,然后将其包含在oracle中的选择查询中?

    我的桌子是 create table mobile id integer m name varchar 20 cost integer 其值为 insert into mobile values 10 NOkia 100 insert in
  • jqplot - 单个值,而不是堆积图中的总计

    In a stacked bar chart we can show total of each series in every stack like this However I want value of each series to
  • Identity.EntityFramework OnModelCreating 是如何调用的

    我正在从事两个类似的项目 但我没有创建其中任何一个 它们都具有相同的本地上下文 如下所示 using Microsoft AspNet Identity EntityFramework public class LocalContext I
  • 如何将uuid存储为数字?

    根据问题的回答 MySQL 中的 UUID 性能 回答者建议将 UUID 存储为数字而不是字符串 我不太确定如何做到这一点 有人可以建议我一些东西吗 我的 ruby 代码如何处理这个问题 如果我理解正确的话 您在主列中使用 UUID 吗 人
  • 如何将 QBASIC PLAY 命令转换为更现代的命令?

    我的 QB 应用程序中有这样的播放命令 PLAY MSe8f 4f 8f 8g8a8b4 a4 g4 f 4 o0b8o1e8e8e4d8e2 我想以某种方式将它们转换为现代应用程序可以使用的东西 有什么想法吗 我目前正在 FreeBasi
  • min_member/2 的反直觉行为

    最小成员 分钟 列表 当 Min 是标准项顺序中最小的成员时为真 如果列表为空 则失败 min member 3 1 2 X X 3 当然 解释是变量在术语的标准顺序中位于所有其他术语之前 并且使用统一 然而 所报告的解决方案感觉有些错误
  • 如何将查询结果映射到 sqlalchemy 中的自定义对象?

    我正在寻找一种方法来告诉 sqlalchemy 将某些 tabes 上的复杂查询映射到自定义类MyResult而不是默认的RowProxy班级 这是一个简单的工作示例 create table foo id integer title te
  • itunesconnect apploader 无效段对齐问题

    伙计们 我想更新我的应用程序最新版本 但应用程序加载器一直给我同样的错误 那就是 错误 ITMS 9000 段对齐无效 此应用程序没有正确的段对齐 应使用最新版本的 Xcode 重新构建 如果您需要进一步帮助 请联系开发者技术支持 我快要疯
  • 防止 ProgressDialog 被 onClick 关闭

    我使用 ProgressDialog 向用户表明他必须等待 并在用户必须等待时使我的应用程序的表面 不可触摸 我向 ProgressDialog 添加了一个按钮 如果某些条件成立 它应该启动一些操作 问题是每次用户按下按钮时 progres
  • Java滑动JPanels

    我有一个显示各种按钮的菜单 我可以让按钮在单击时调用它们各自的 JPanel 问题是我想让 Jpanel 在调用时滑入 而不是立即弹出 我尝试使用补间引擎 作为 Java 初学者 我发现它真的让人不知所措 所以我决定使用定时动画 我能够使顶
  • Kafka Streams如何获取kafka headers

    我有下面的卡夫卡流代码 public class KafkaStreamHandler implements Processor