使用 Spring Kafka 添加自定义标头

2023-12-31

我计划使用 Spring Kafka 客户端在 Spring Boot 应用程序中使用 kafka 设置并生成消息。我看到 Kafka 0.11 中对自定义标头的支持详情here https://issues.apache.org/jira/browse/KAFKA-4208。虽然它可用于本机 Kafka 生产者和消费者,但我没有看到在 Spring Kafka 中添加/读取自定义标头的支持。

我正在尝试根据重试计数实现消息的 DLQ,我希望将重试计数存储在消息标头中,而无需解析有效负载。


当我偶然发现这个问题时,我正在寻找答案。不过我正在使用ProducerRecord<?, ?>类而不是Message<?>,所以标头映射器似乎不相关。

这是我添加自定义标头的方法:

var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);

现在为了读取标头(在使用之前),我添加了一个自定义拦截器。

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        Set<TopicPartition> partitions = records.partitions();
        partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));

        return records;
    }

    private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
        records.forEach(record -> {
            var myHeaders = new ArrayList<Header>();
            record.headers().headers("MyHeader").forEach(myHeaders::add);
            log.info("My Headers: {}", myHeaders);
            // Do with header as you see fit
        });
    }

    @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

最后一点是使用以下(Spring Boot)配置向 Kafka Consumer Container 注册此拦截器:

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
public class MessagingConfiguration {

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Spring Kafka 添加自定义标头 的相关文章

随机推荐

  • 如何在 F# 中实现 beta 缩减函数?

    我正在用 F 编写 lambda 演算 但我一直坚持实现 beta 约简 用实际参数替换形式参数 lambda x e f gt e f x 使用示例 lambda n n 2 3 7 gt n 2 3 7 n gt 7 2 3 所以我很想
  • html如何将H1、H2等设为链接?

    将 h1 h2 等标题转换为链接的正确代码是什么 search engines标题和链接的索引文本 Is it a href h1 heading h1 a or h1 a href heading a h1 谁能解释为什么 每这里 htt
  • 我可以使用资源字符串作为包名称吗?

    这样的事情可能吗
  • Vaadin 23 错误:找不到模块“@vaadin/build-status-plugin”

    我正在尝试从 Vaadin 18 升级到 Vaadin 23 执行升级说明中列出的步骤后 我在获取前端构建时遇到了大量问题 最新的阻止程序是启动应用程序后 npm 运行 但我看到以下错误 我检查了node modules文件夹 build
  • Go模板和函数

    在我的 go 代码中我经常使用if像这样 if user user Registered go 模板中的等效代码是 if and User User Registered end 不幸的是 如果模板中的代码失败 User is nil 在g
  • 将 NSDecimalNumber 转为负数

    我正在寻找一种方法来扭转NSDecimalNumber乘以负数 1 decNumber is the one I would like to turn negative NSDecimalNumber decNumber values ob
  • getter 是否应该返回对象实例的副本以避免副作用?

    我想获取从类的函数返回的值 在我的班级里 public class MyClass private Color color new Color 0f 0f 0f 1f public Color getColor return this co
  • 多维 np.argmax?

    我有一个形状为 n n g 的 3D 数组 并且我需要每个 n n argmax 即结果应该是每个长度为 g 的两个索引向量 x y 直观的解决方案是 array np random uniform size 5 5 1000 np arg
  • Node.js、(Hi)Redis 和 multi 命令

    我正在使用 node js 和 redis 并通过此命令安装了hiredis 库 npm install hiredis redis 我在这里查看了多个示例 https github com mranney node redis blob
  • 使用调查权重时如何为 Logit 模型生成边际效应?

    我通常使用 mfx 包和 logitmfx 函数生成 logit 模型边际效应 然而 我当前使用的调查具有权重 由于某些人群中的过度采样 这对样本中 DV 的比例有很大影响 而 logitmfx 似乎没有任何方法包含权重 我已经用 svyg
  • PhantomJs 脚本中的 Ajax 请求

    Problem phantomJs 脚本中对本地页面的 Ajax 请求不起作用 无响应 问题 我怎样才能让它发挥作用 有什么想法或可能的解决方案吗 描述 我正在运行 phantomJs 脚本 我需要访问另一个页面 本地 中的 php 函数提
  • 如何从 Java Web Start (JDK 8) 升级到 jlink (JDK 9+) 以实现自动更新应用程序?

    Java 8 及之前的版本有Java网络启动 https www java com en download faq java webstart xml 当我们更改应用程序时 它会自动更新应用程序 Oracle 建议用户迁移到jlink ht
  • 网络模拟器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 您是否知道能够模拟网络条件 例如带宽受限 延迟 端口关闭 拥塞 冲突 的软件 特定于平台很好 但我希望
  • Facebook JavaScript SDK 是否有 TypeScript 接口定义

    TypeScript 包包含 Node js 和 jQuery 的接口定义 我想知道是否有人已经为 Facebook JavaScript SDK 编写了定义 你可以用tsd https github com DefinitelyTyped
  • Eclipse RCP 应用程序:如何禁用特定扩展?

    我正在使用 Eclipse HELIOS 编写富客户端应用程序 以下条目会自动添加到我的应用程序 视角菜单 中 Java Java 浏览 Java 类型层次结构 团队同步 我需要摆脱他们 我正在使用扩展点 org eclipse ui ac
  • SQL Server SP_SEND_DBMAIL 图像文件附件

    我正在表上使用触发器来使用 sp send dbmail 发送电子邮件 我想在图像类型的电子邮件中包含文件附件 jpeg 的原始数据存储在二进制类型的 ndl Image 列中 我有以下代码 DECLARE ReferenceID varc
  • 使用适用于 Windows 的 Git 推送到 GitHub 时出错

    我已经在 GitHub 上创建了一个 Git 存储库 也创建了一个本地存储库 首先 我将远程存储库拉入本地存储库 然后我添加了一个文件 暂存该文件 提交了它 现在我尝试再次推送到远程存储库 但失败并显示以下消息 Pushing to htt
  • 在 Flutter 中隐藏滚动条上的底部导航栏

    我在正文和底部导航栏中有一个博客文章列表 我想在帖子列表向下滚动时使用向下滑动动画隐藏底部导航栏 并在向上滚动时使用向上滑动动画可见 怎么做 此解决方案只是解决此问题的方法 可能会发生一些有害的变化 import package flutt
  • Linq to NHibernate 与 ICriteria

    我通常经常使用 LINQ 尤其是 LINQ to Objects 因此我对 LINQ 相当熟练 我正在考虑使用 LINQ to NHibernate 作为我的 NHibernate 项目的查询语言 当我编写一些测试时 我注意到 LINQ t
  • 使用 Spring Kafka 添加自定义标头

    我计划使用 Spring Kafka 客户端在 Spring Boot 应用程序中使用 kafka 设置并生成消息 我看到 Kafka 0 11 中对自定义标头的支持详情here https issues apache org jira b