spring boot配置双Kafka方法

2023-11-09

第一步:application.yml的配置

server:
  port: 8080
spring:
  application:
    name: demo 
  kafka:
    one:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        group-id: xxxx
        enable-auto-commit: true
    two:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        group-id: xxxx
        enable-auto-commit: true

第二步:配置config

@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate<String, String> xxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(20);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}
@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate<String, String> xxxxxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(6);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        //        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}

注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行

第三步:

@Resource
private KafkaTemplate<String, String> xxxOneTemplate;
@Resource
private KafkaTemplate<String, String> xxxxTwoTemplate;

直接在你要用到的类中直接引用就行。

跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有研究成功就不献丑了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring boot配置双Kafka方法 的相关文章

  • Android:java.lang.OutOfMemoryError:

    我在 Android 上开发了一个使用大量图像的应用程序 可绘制文件夹中有很多图像 比如说超过 100 张 我正在开发图像动画应用程序 我使用 imageview 来显示 GIF 图像 我使用了将 gif 图像分割成多个 PNG 格式图像的
  • 了解 netty 通道缓冲区和水印

    我正在尝试了解网络缓冲区和水印 作为一个测试用例 我有一个 netty 服务器 它向客户端写入数据 客户端被阻止 基本上每次读取之间有 10 秒的睡眠时间 在正常 I O 下 如果接收方被阻塞 TCP 发送方将受到限制 由于流量控制 发送速
  • 如何准确判断 double 是否为整数? [复制]

    这个问题在这里已经有答案了 具体来说 在 Java 中 我如何确定double是一个整数 为了澄清 我想知道如何确定 double 实际上不包含任何分数或小数 我主要关心的是浮点数的性质 我想到的方法 以及我通过谷歌找到的方法 基本上遵循以
  • H2数据库:如何进行加密保护,而不暴露文件加密密钥

    我们在服务器模式下使用Java H2数据库 因为我们不希望用户访问数据库文件 为了对数据库文件添加更多保护 我们计划使用 AES 加密 将 CIPHER AES 添加到数据库 URL 以防存储被盗 但是 每个用户在连接时还需要提供文件保护密
  • Selenium 和 TestNG 同时使用“dependsOn”和“priority =”问题

    我正在努力在 GUI 自动化测试中实现更好的工作流程控制 我首先从dependsOn开始 但很快发现缺点是如果一个测试失败 则套件的整个其余部分都不会运行 所以我改用 priority 但看到了意外的行为 一个例子 Test priorit
  • Java Junit 测试 HTTP POST 请求

    我需要测试以下方法而不改变方法本身 该方法向服务器发出 POST 方法 但我需要制作一个独立于服务器的测试用例 在将其重定向到本地文件之前 我测试了类似的方法 但为此我将协议指定为文件 主机名指定为 localhost 端口指定为 1 我的
  • 使用 Jena 查询维基数据

    目前 Wikidata 有一个 SPARQL 端点 https query wikidata org https query wikidata org 我想使用 Jena 3 0 1 查询此网站 我使用以下代码 但收到错误消息 端点返回的
  • 如何在Netbeans中设置JList的ListModel?

    我在 Netbeans IDE 的帮助下设计了一个 Swing GUI 该 GUI 包含一个 JList 默认情况下 它使用 QAbstractListModel 将其作为 JList 构造函数中的参数传递以创建该 JList 我想在 Ne
  • 是否可以手动检查 LocateRegistry 是否存在?

    I 已经发现 https stackoverflow com a 8338852 897090一种安全的方式获得LocateRegistry 即使注册表尚不存在 Registry registry null try registry Loc
  • 在Java中如何将字节数组转换为十六进制?

    我有一个字节数组 我希望该数组的每个字节字符串转换为其相应的十六进制值 Java中有没有将字节数组转换为十六进制的函数 byte bytes 1 0 1 2 3 StringBuilder sb new StringBuilder for
  • java中如何重新初始化int数组

    class PassingRefByVal static void Change int pArray pArray 0 888 This change affects the original element pArray new int
  • RxJava android mvp 单元测试 NullPointerException

    我是 mvp 单元测试的新手 我想对演示者进行一个非常基本的测试 它负责登录 我只想断言 view onLoginSuccess 这是演示者代码 public LoginPresenter LoginViewContract loginVi
  • ActiveMQ JNDI 查找问题

    尝试使用 JNDI 运行以下 ActiveMQ http activemq apache org jndi support html http ActiveMQ 20JNDI 并且我的 jboss server node lib 文件夹中有
  • Java 8 方法签名不一致

    Java 8 为我们提供了具有很长签名的新方法 如下所示 static
  • 获取 Future 对象的进度的能力

    参考 java util concurrent 包和 Future 接口 我注意到 除非我弄错了 只有 SwingWorker 实现类才能启动冗长的任务并能够查询进度 这就引出了以下问题 有没有办法在非 GUI 非 Swing 应用程序 映
  • 开发者环境-如何调用/消费其他微服务

    背景 我的环境 Java Play2 MySql 我在 Play2 gt S1 S2 S3 上编写了 3 个无状态 Restful 微服务 S1 消耗来自 S2 和 S3 的数据 因此 当用户点击 S1 时 该服务会异步调用 S2 S3 合
  • 为什么 java.util.Arraylist#clear 按照 OpenJDK 中的方式实现?

    http grepcode com file repository grepcode com java root jdk openjdk 6 b14 java util ArrayList java 473 http grepcode co
  • 如何使用eclipse调试JSP tomcat服务?

    我想使用 Eclipse IDE 调试器来调试单独运行的 JSP Struts Tomcat Hibernate 应用程序堆栈 如何设置 java JVM 和 eclipse 以便设置断点 监视变量值并查看当前正在执行的代码 我刚刚用谷歌搜
  • java Web应用程序中的日期转换

    String date1 13 03 2014 16 56 46 AEDT SimpleDateFormat sdf new SimpleDateFormat dd MM yyyy HH mm ss z sdf setTimeZone Ti
  • Android ClassNotFoundException:在路径上找不到类

    10 22 15 29 40 897 E AndroidRuntime 2561 FATAL EXCEPTION main 10 22 15 29 40 897 E AndroidRuntime 2561 java lang Runtime

随机推荐

  • 华为OD机试 - 选修课(Java & JS & Python)

    题目描述 给定一个元素类型为小写字符串的数组 请计算两个没有相同字符的元素长度乘积的最大值 如果没有符合条件的两个元素 返回0 输入描述 第一行为第一门选修课学生的成绩 第二行为第二门选修课学生的成绩 每行数据中学生之间以英文分号分隔 每个
  • 汇编笔记

    更新于20190929 1 Intel和AT T汇编 参数是反的 AT T寄存器前加 常量前加 Intel mov rax rcx rcx gt rax mov cl 2 对应AT T movq rcx rax rcx gt rax mov
  • RHEL/centos8.0离线安装n卡驱动,cuda10.1,cudnn7.5,anaconda3,pycharm以及mmdeection和simpledet的搭建

    我最近在两台RHEL8 0的服务器装了这些玩意 特此记录一下 1 离线安装nvidia driver cuda10 1 cudnn7 5 关键因素 显卡型号 Quadro P4000 系统 RHEL 8 0 用 cat etc redhat
  • IPv4与ipv6联系

    IPv4又称互联网通信协议第四版 是网际协议开发过程中的第四个修订版本 也是此协议第一个被广泛部署的版本 但是2019年11月26日 全球所有43亿个IPv4地址已分配完毕 IPV6是互联网工程任务组设计的用于替代IPv4的下一代IP协议
  • Java高级程序设计_JAVA高级程序设计

    恢复内容开始 import java awt import java awt event ActionEvent import java awt event ActionListener import java awt event Mous
  • 12个C语言必背实例

    C语言实例第01期 十进制数转换二进制数 实例代码 include stdio h int main int m n k 定义变量 int a 16 0 printf 请输入一个0 32767之间的数字 n scanf d n printf
  • ImageNet零样本准确率首次超过80%,地表最强开源CLIP模型更新

    关注公众号 发现CV技术之美 本文转自新智元 编辑LRS 开源模型OpenCLIP达成ImageNet里程碑成就 虽然ImageNet早已完成历史使命 但其在计算机视觉领域仍然是一个关键的数据集 2016年 在ImageNet上训练后的分类
  • STM32--STM32CubeMX的Timer3定时1ms功能HAL库操作

    一 STM32CubeMX的设置 时钟源的选择 Crystal Ceramic Resonator 调试方法选择 Serial Wire 时钟输入为40MHz Timer3的参数设置 使能Timer3的中断 点击 Generate Code
  • unigui中的unidbgrid单元格内容太长自动回行

    1 servermodule中customcss中加入
  • Android 使用ffmpeg软编码 将摄像头采集视频编码成视频文件

    Android 使用ffmpeg软编码 将摄像头采集视频编码成视频文件 这次代码实现的是视频采集的功能 Android 通过jni 调用ffmpeg 编码yuv数据变成视频文件 先上代码 编码器上下文保存的实体 struct EnCodeB
  • R语言课程论文

    本文是自己在学习R统计分析课程后的课程小论文 对详细详细的文档及实现的R代码感兴趣者 可见文末获取方式 若转载请注明出处 欢迎大家交流学习 不足之处请多指教 Word版全文以及相应的r代码获取方式 资源下载链接 https download
  • 学一点Wi-Fi:WPA3 BP/OCV/SCV/PK/H2E/TD

    WFA在2020年底发布了WPA3标准的第三版 其中又提出了一些新的feature 这里结合之前的版本简单总结一下 1 BP BP是Beacon Protection的缩写 问 Beacon中的信息都是未加密的 所以可能存在攻击者会对AP发
  • 课设:影院管理系统

    影院管理系统 导言 知识点总结 课设介绍 导言 从3月份开始到现在 大概两周多的时间 写了一个影院管理系统 功能有待改善 有的功能还有点bug需要该 现在总结一下 影院管理系统告一段落 接下来要学习算法和数据结构 知识点总结 一 三层架构
  • 一文看懂npm、yarn、pnpm之间的区别

    本文作者对比了当前主流的包管理工具npm yarn pnpm之间的区别 并提出了合适的使用建议 以下为译文 NPM npm是Node js能够如此成功的主要原因之一 npm团队做了很多的工作 以确保npm保持向后兼容 并在不同的环境中保持一
  • 大数据毕设选题 - 深度学习口罩佩戴检测系统(python OpenCV YOLO)

    文章目录 0 前言 1 课题介绍 2 算法原理 2 1 算法简介 2 2 网络架构 3 关键代码 4 数据集 4 1 安装 4 2 打开 4 3 选择yolo标注格式 4 4 打标签 4 5 保存 5 训练 6 实现效果 6 1 pyqt实
  • Linux(ubuntu)上安装RDP Server(Xrdp)使用的注意事项

    ubuntu上的基本安装方法 1 apt get install xrdp 基本上就已经安装完成了 但是此时连接会出现异常 类似黑屏的情况 原因 1 Xrdp不支持unity 3D的图形 解决方法 1 使用xfce或者gnome 2d等 如
  • C#小知识

    项目编译后复制文件到生成目录 方法1 对于单个文件 可以点击属性 输出目录里选择始终复制 方法2 把项目中的ServerScripts复制到输出目录 在项目设置中 生成事件里添加批处理 xcopy ProjectDir ServerScri
  • anaconda用法

    查看已经安装的包 pip list 或者 conda list 安装和更新 pip install requests pip install requests upgrade 或者 conda install requests conda
  • LINUX权限-bash: ./startup.sh: Permission denied

    LINUX权限 bash startup sh Permission denied 执行 startup sh 或者 shutdown sh的时候 报 Permission denied 需要用命令 chmod 修改一下bin目录下的 sh
  • spring boot配置双Kafka方法

    第一步 application yml的配置 server port 8080 spring application name demo kafka one bootstrap servers xxx xxx xxx xxx consume