spring集成kafka,实现一个topic可以被多个group消费

2023-11-12

    由于新公司是做物联网的,公司刚起步,没什么项目,就是在做一些基础的服务的搭建,现在微服务这么火,可想而知,Spring Boot ,Spring Cloud 是必须要会的技能,而做物联网,把各种智能设备的数据采集上来,也避免不了要用到消息系统。所以我们的架构师从众多消息中间件中选出了kafka和mqtt。mqtt在物联网中的作用不言而去,我这边也在学,从mosquitto服务端搭建,到通过mqtt的java客户端paho 实现消息的生产和消费,不过用paho实现太过复杂,还是用springboot为我们提供的实现方式简便。no no,偏离主题了,今天主要说的是kafka。

    首先呢,kafka集群和zookeeper的集群的搭建我这就不赘述了,网上有很多,其实还是考研自己的linux水平,linux熟的 这些服务搭建 就是小菜一碟。

    kafka消费者有一个熟悉groupId 就是一个topic中的消息只能被同一个groupId的消费者中的一个消费者消费。

这个groupId,在配置消费者时指定。

但是问题来了,怎么实现让一个topic可以让不同group消费呢。

这个为也不会,哈哈

所以上网查了,看了一个说的特别好。

意思就是

goupid不要用配置文件配置的方式

细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

动手吧。

新建一个KafkaConsumerConfig类,代码如下,指定了两个容器,也就两个group

分别为kafkaListenerContainerFactory1和kafkaListenerContainerFactory2

import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
@Configuration
public class KafkaConsumerConfig {
 
    private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";
 
    private String group1 = "test1";
    private String group2 = "test2";
 
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory1());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory2());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }
    
    public Map<String, Object> getCommonPropertis() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    }
    
 
    public ConsumerFactory<String, String> consumerFactory1() {
        Map<String, Object> properties = getCommonPropertis();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    
    public ConsumerFactory<String, String> consumerFactory2() {
    	 Map<String, Object> properties = getCommonPropertis();
         properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);
         return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
}

上面代码中,其实,很多配置项,你也可以直接用@value的方式,从配置文件中读取过来,那么需要修改参数值的时候,就直接更改配置文件就行了,这点相信就不用教了,不懂的网上一搜一堆。

最后,在@KafkaListener 中指定容器名称

@KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
@KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")
启动,你就会发现,卧槽,还真可以
[           main] xxx     : Kafka version : 0.10.1.1
[           main] xxx     : Kafka commitId : f10ef2720b03b247
[           main] xxx     : Tomcat started on port(s): 82 (http)
[           main] xxx     : Started App in 3.913 seconds (JVM running for 4.321)
[    test2-0-C-1] xxx     : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group test2.
[    test1-0-C-1] xxx     : Discovered coordinator 192.168.52.131:9092 (id: 2147483645 rack: null) for group test1.

至此,就实现了多个customer不同group的功能,亲测有效。

参考:https://blog.csdn.net/caijiapeng0102/article/details/80765923

https://www.jianshu.com/p/6a44da908e48  高版本 在@KafkaListener 注解中有groupId属性可以设置


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring集成kafka,实现一个topic可以被多个group消费 的相关文章

  • aarch64交叉编译libturbojpeg

    0 目的 因项目上有使用jpeg turbo的需求 用的aarch64的平台 需要使用交叉编译工具来获取libturbojpeg so 1 libturbojpeg下载 libturbojpeg源码git地址 GitHub libjpeg

随机推荐

  • Hyperledger Fabric如何通过虚拟机部署以太坊智能合约

    EVM作为用户链代码安装到Fabric中 然后可以通过它部署智能合约 单个EVM链代码足以在通道上运行多个以太坊智能合约 链码不采用以太坊的共识方法 所有事务仍将遵循Fabric事务流中的执行 订单 验证步骤 确保在不同组织中的足够对等方安
  • 判断子序列

    给定字符串 s 和 t 判断 s 是否为 t 的子序列 leetcode链接 你可以认为 s 和 t 中仅包含英文小写字母 字符串 t 可能会很长 长度 500 000 而 s是个短字符串 长度 lt 100 字符串的一个子序列是原始字符串
  • 项目经理与技术经理的区别

    项目经理和技术经理有什么区别 区别有多大呢 这个问题此前我从没有认真的去思考过 直到被明确的问到的时候 才细思极恐 连两者的区别都没能有条理的讲出来 一 关于项目经理 在没有真正进入软件行业之前 对于系统集成方面的项目还是有些心得的 有种一
  • 编译Linux内核获取LLVM bitcode (LLVM IR)

    最近要获取linux内核的llvm bitcode 以便后续进行分析 例如获取callgraph等等 尝试了很多提取llvm bitcode的方法 用build bom wllvm 过程中也出现了很多错误 最后用wllvm终于成功了 记录下
  • Docker容器与虚拟化技术:Docker资源控制、数据管理

    目录 一 理论 1 资源控制 2 Docker数据管理 二 实验 1 Docker资源控制 2 Docker数据管理 三 问题 1 docker容器故障导致大量日志集满 造成磁盘空间满 2 当日志占满之后如何处理 四 总结 一 理论 1 资
  • 【模电】0014 运放自激振荡和消除(补偿)

    一般我们讨论的负反馈放大电路多关注其幅频特性 也就是它的增益 而对其相频特性关注的不多 这主要是因为 一个放大电路如果它工作状态是稳定的 其输入和输出相差一定的相位对分析它的特性并不影响 只是相当于信号延迟了一点时间 注意这里有个前提条件
  • 关于IP网段间互访的问题——路由是根本

    文章出处 http blog csdn net dog250 archive 2010 02 09 5303291 aspx 之所以IP网段间可以互相访问 完全靠的就是路由 因此路由是IP通信的根本 IP是机器可以进行通信的资格证书 而路由
  • 【满分】【华为OD机试真题2023 JAVA&JS】工作安排

    华为OD机试真题 2023年度机试题库全覆盖 刷题指南点这里 工作安排 知识点循环数组贪心动态规划 时间限制 1s 空间限制 32MB 限定语言 不限 题目描述 小明每周上班都会拿到自己的工作清单 工作清单内包含n项工作 每项工作都有对应的
  • Vue Baidu Map使用

    百度地图官方提供的是常规
  • DS单链表--类实现

    用C 语言和类实现单链表 含头结点 属性包括 data数据域 next指针域 操作包括 插入 删除 查找 注意 单链表不是数组 所以位置从1开始对应首结点 头结点不放数据 输入 n 第1行先输入n表示有n个数据 接着输入n个数据 第2行输入
  • 螺旋队列(由里向外)

    假设有如下排列 21 22 20 7 8 9 10 19 6 1 2 11 18 5 4 3 12 17 16 15 14 13 1的坐标是 0 0 3的坐标是 1 1 7的坐标是 1 1 分析 第1层之内有1个数 第2层之内有9个数 第3
  • Kafka Connect JNDI注入漏洞复现(CVE-2023-25194)

    漏洞原理 Apache Kafka Connect中存在JNDI注入漏洞 当攻击者可访问Kafka Connect Worker 且可以创建或修改连接器时 通过设置sasl jaas config属性为com sun security au
  • Qt界面刷新优化的一些心得

    背景 一个类似Windows任务管理器的性能界面 该界面有多个曲线图同时定时刷新 每个曲线图包括多条曲线更新 数据更新频率大概为一秒一次 程序中曲线图已封装为自定义的图表类 给出数据更新接口 初始方案 主程序设计方案上 采用数据收集和数据更
  • 《云计算与大数据技术应用》

    第一章 云计算概述 1 1什么是云计算 1 1 1云计算的定义 云计算是分布式计算的一种 指的是通过网络 云 将巨大的数据计算处理程序分解成无数个小程序 然后 通过多部服务器组成的系统进行处理和分析这些小程序得到结果并返回给用户 云计算早期
  • javascript数组的直接量语言允许有可选的结尾的逗号

    var undefs 数组有2个元素 都是undefined 数组直接量的语法允许有可选的结尾的逗号 故 只有两个元素
  • OA会议管理系统之我的会议(会议排座&可拖拽座位&附源码)

    目录 一 前言 1 导读 二 我的会议功能实现 1 功能介绍 2 功能分析 3 功能实现 3 1 前期准备 3 2 dao层编写 3 3 jsp页面搭建 3 4 Web层编写 4 案例展示 一 前言 1 导读 OA会议管理系统之会议发布 内
  • RabbitMQ提供了6种消息模型介绍

    RabbitMQ提供了6种消息模型 但是第6种其实是RPC 并不是MQ 因此不予学习 那么也就剩下5种 但是其实3 4 5这三种都属于订阅模型 只不过进行路由的方式不同 一 基本消息模型 RabbitMQ是一个消息代理 它接受和转发消息 你
  • oracle删除重复数据保留第一条记录

    1 查找表中多余的重复记录 重复记录是根据单个字段 Id 来判断 select from 表 where Id in select Id from 表 group byId having count Id gt 1 2 删除表中多余的重复记
  • zoj 1201 Inversion

    题目意思 如果输入的是p类串 则输出i类串 如果输出的是i类 则输出p类串 p转i 寻找在p串中在j左边的比j大的数的个数 i串中的第j个数填为该数 i转p 从尾部开始 若第j个数的值为x 则说明在p串中j的左边有x个数大于j 通过从后到前
  • spring集成kafka,实现一个topic可以被多个group消费

    由于新公司是做物联网的 公司刚起步 没什么项目 就是在做一些基础的服务的搭建 现在微服务这么火 可想而知 Spring Boot Spring Cloud 是必须要会的技能 而做物联网 把各种智能设备的数据采集上来 也避免不了要用到消息系统