springboot集成kafka的相关配置及自定义

2023-05-16

之前的文章末尾,简单的实现了springboot集成kafka,完成了简单的测试,今天我们来扩展一下相关内容。

首先详解一下配置文件的内容:

spring:
  kafka:
    # 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔)
    producer:
      bootstrap-servers: 124.223.205.125:9092
      # 发生错误后,消息重发的次数。
      retries: 3
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 指定消息key和消息体的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      properties:
        # 自定义生产者拦截器
        interceptor.classes: com.volga.kafka.interceptor.ProducerPrefixInterceptor
    consumer:
      enable-auto-commit: false #手动提交
      bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}
      # 指定 group_id
      group-id: group_id
      # Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: earliest
      # 指定消息key和消息体的序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        # 自定义消费者拦截器
        interceptor.classes: com.volga.kafka.interceptor.ConsumerPrefixInterceptor
    # 默认主题
    topic: my-topic
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
      type: batch

以上的producer和consumer的相关配置也可以在java文件中实现:

如上图的实现,自己可以手动实现一下。

kafka内部自己实现分区、策略等一系列的逻辑,当然这些也可以自定义,这里有需要的可以自己研究一下,我这里就不需要了。

接下来,我介绍一个场景,是大家在实际的项目中可以用到。有这样一个场景,在项目中,我们搭建了kafka集群,然而在环境中,会有各种不同的消息,有一些消息不是我们需要的,有些消息是我们需要的,这时我们可以通过过滤器来进行控制和过滤。

在生产者端,我添加一个过滤器在消息前统一加上一个前缀。

 

@Slf4j
public class ProducerPrefixInterceptor implements ProducerInterceptor<String,String> {

    AtomicInteger success = new AtomicInteger(0);
    AtomicInteger fail = new AtomicInteger(0);

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        // 消息统一添加前缀
        String modifyValue = "prefix-"+producerRecord.value();
        return new ProducerRecord<>(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), modifyValue, producerRecord.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (Objects.nonNull(recordMetadata)){
            success.incrementAndGet();
        }else {
            fail.incrementAndGet();
        }
    }

    @Override
    public void close() {
        log.info("success:%d\nfail:%d\n",success.get(),fail.get());
        success.set(0);
        fail.set(0);
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

然后在消费者消费时,我遇到prefix前缀的消息时,就统一过滤掉,这不是我所需要的消息。

@Slf4j
public class ConsumerPrefixInterceptor implements ConsumerInterceptor<String,String> {

    /**
     * 过滤掉test开头的数据
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        List<ConsumerRecord<String, String>> filterRecords = new ArrayList<>();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords= new HashMap<>();
        Set<TopicPartition> partitions = consumerRecords.partitions();
        for(TopicPartition tp : partitions){
            List<ConsumerRecord<String, String>> records = consumerRecords.records(tp);
            for(ConsumerRecord<String, String> record: records){
                if(!record.value().startsWith("prefix")) {
                    filterRecords.add(record);
                }
            }
            if(filterRecords.size() > 0){
                newRecords.put(tp, filterRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((k,v) -> log.info("tp:%s--offset:%s\n",k,v));
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

相关的配置在上述的配置文件中也已经列出了,也可以在代码中加以配置。

props.put("properties.interceptor.classes","com.volga.kafka.interceptor.ProducerPrefixInterceptor");

这样就能实现过滤不需要的消息了。

翻看源码中,生产端主要是KafkaProducer<k,v>这个类中KafkaProducer方法:

消费端这边是对应的:KafkaConsumer<k,v>,这个类中KafkaConsumer方法:

源码里kafka实现的配置逻辑大家可以仔细研究一下。

我是空谷有来人,谢谢支持! 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

springboot集成kafka的相关配置及自定义 的相关文章

  • about:blank_关于Blank –什么是:blank意味着什么,您应该摆脱它吗?

    about blank Have you ever tried to go to a web page and instead see 34 about blank 34 displayed in the address bar where
  • 剖析NoSQL数据库与MySQL数据库

    NoSQL数据库与MySQL数据库是两种不同类型的数据库管理系统 它们具有不同的特点和应用场景 下面我将详细介绍它们的优势和应用区别 MySQL数据库是一种关系型数据库管理系统 RDBMS 它使用结构化查询语言 SQL 进行数据管理 它采用
  • Linux安装NoSQL数据库

    安装NoSQL数据库的方法因具体数据库而异 下面以几种常见的NoSQL数据库为例 介绍它们的安装方法 MongoDB MongoDB提供了多个平台上的安装方式 包括二进制安装包 apt和yum安装等 具体可以参考MongoDB的官方文档 以
  • .vue 怎么变成 .js,我们来试一试!看完会更懂 Vue 吗?

    本文是基于Vite 43 AntDesignVue打造业务组件库 1 专栏第 12 篇 xff0c 坚持就是胜利 xff01 接着上篇说 xff0c 交付一个 vue 组件不仅需要解析 DSL xff0c 还要处理 JS TS xff0c
  • C 练习2

    define CRT SECURE NO WARNINGS include lt stdio h gt include lt stdlib h gt include lt string h gt 题目 xff1a 企业发放的奖金根据利润提成
  • PCB走线注意事项

    资料来源于瑞芯微电子 晶振布线 xff1a 在设计印刷电路板时 xff1a 保持晶体尽可能靠近可编程芯片晶体引脚 xff0c 保持走线长度短和小 xff0c 以减少电容器负载和防止不必要的噪音辐射 将所有信号从晶体和X1和X2引脚下面走线应
  • C语言 十进制转 二进制 八进制 十六进制

    整体源码 xff1a span class token macro property span class token directive hash span span class token directive keyword inclu
  • Kubuntu 安装fcitx 5

    kubuntu 22 04 安装fcitx5 一 安装前 xff1a 更新软件源 xff0c 不然下载会随缘中断 找到这个文件夹 etc apt 修改sources list 将所有的 http cn archive ubuntu com
  • 在学习opencv 多通道变单通道时 自己出现的一点错误

    include 34 stdafx h 34 include 34 highgui h 34 include 34 cv h 34 int main IplImage str1 str2 str3 str1 61 cvLoadImage 3
  • 计蒜客-1189-树根

    数根可以通过把一个数的各个位上的数字加起来得到 如果得到的数是一位数 xff0c 那么这个数就是数根 如果结果是两位数或者包括更多位的数字 xff0c 那么再把这些数字加起来 如此进行下去 xff0c 直到得到是一位数为止 比如 xff0c
  • HTTP——CDN简单认识

    贴一个B站地址 Up主讲的不错 以下内容是个人整理 CDN CDN Content Delivery Network 内容分发网络 CDN的必要性 服务器 包括云服务器 一定有一个物理位置 访问服务器的客户端如果距离服务器太远 或者服务器附
  • twitter闪退解决办法_那么我们如何解决Twitter? 用户界面改造将是一个不错的起点。...

    twitter闪退解决办法 by Daryll Santos 达里尔 桑托斯 Daryll Santos 那么我们如何解决Twitter xff1f 用户界面改造将是一个不错的起点 So how do we fix Twitter A us
  • MySQL分组查询,获取分组后数据

    MySQL分组查询 xff0c 获取分组后数据 MySQL分组查询 xff0c 将其它列的数据 xff0c 合并到一行展示 xff0c 可以设置去重 xff0c 设置去重 xff0c 设置排序 xff0c 截取指定条数 创建表结构 CREA
  • 解决的问题记录(持续更新)

    1 Ubuntu Server2020 4 树莓派WiringPi的安装与编译 链接 http i lckiss com p span class token operator 61 span span class token number
  • Microsoft Edge浏览器文件保存位置记录

    现在流行的Windows 10操作系统基本上都是使用Edge浏览器了 xff0c 作为一个重要的在线下载工具 xff0c 我得知道他把我下载的文件保存到什么地方去了 下面就记录下这两天观察到的 1 要是直接点击下载 xff0c 另存为xxx
  • python--排错--AttributeError: 'str' object has no attribute 'decode',关于python3的字符串

    AttributeError 39 str 39 object has no attribute 39 decode 39 一般是因为str的类型本身不是bytes xff0c 所以不能解码 两个概念 普通str xff1a 可理解的语义
  • Java向高级进阶(Java开发1-3年的仔看过来)

    遇到技术瓶颈 xff1f 在开发过程你是否觉得自己很多想法可是很难实现 xff1f 就算能实现但是实现的过程却很low xff1f 感觉自己开发几年一直在面向需求开发 xff0c 说好的面向对象开发呢 xff1f xff1f xff1f x
  • vue项目引用图标font-awesome提示错误

    vue引入font awesome图标样式一直报错 如图 xff1a 提示安装 xff1a npm install save font awesome scss font awesom 可以是安装之后还是报错 原来搞错顺序了 xff0c 要
  • IDEA 自动生成类注释和方法注释 (超舒服,超详细篇)

    目录 生成类注释生成类注释模板 生成方法注释生成方法注释模板方式注释的使用 很舒服的 xff0c 很详细的教程步骤 生成类注释 File gt Settings PreferencesEditor gt File and Code Temp
  • springboot项目解决扛住瞬间千次重复提交问题

    前言 在实际的开发项目中 一个对外暴露的接口往往会面临很多次请求 xff0c 我们来解释一下幂等的概念 xff1a 任意多次执行所产生的影响均与一次执行的影响相同 按照这个含义 xff0c 最终的含义就是 对数据库的影响只能是一次性的 xf

随机推荐

  • spring-boot mybatis-plus集成 代码自动生成 和 自定义生成代码 (简单,方便 易理解)

    mybatis plus自定义生成代码 导包在yml配置mybaits plus创建CodeGeneration java类测试看效果图 前言 xff1a 之前都是通过配置generatorConfig xml文件实现自动生成代码 xff0
  • 多线程之间如何实现通信,基础版(示例说明)

    多线程之间如何实现通信 什么是多线程之间通讯 xff1f 多线程之间通讯需求代码实现基本实现 什么是多线程之间通讯 xff1f 多线程之间通讯 xff0c 其实就是多个线程在操作同一个资源 xff0c 但是操作的动作不同 多线程之间通讯需求
  • 简单总结ConcurrentHashMap

    ConcurrentHashMap的概念 哈希算法 xff08 hash algorithm xff09 xff1a 是一种将任意内容的输入转换成相同长度输出的加密方式 xff0c 其输出被称为哈希值 HashMap与HashTable 因
  • openstack负载均衡_使用OpenStack Trove和Manila扩展DBaaS工作负载

    openstack负载均衡 稳定的数据库是任何企业应用程序中最常需要的组件之一 xff0c 而OpenStack背后的开发社区正在努力确保在开源云中使用数据库是一种简单 xff0c 可靠和高效的体验 Tesora的Amrith Kumar
  • 线程池原理分析(附线程池原理图)

    目录 什么是线程池使用线程池的好处线程池的作用线程池创建的四种方式重点介绍newFixedThreadPool线程池 线程池原理解析合理配置线程池大小 线程池就是这么简单 什么是线程池 线程池其实就是将多个线程对象放到一个容器当中 使用线程
  • Java反射机制,通过反射机制手写一个spring ioc框架

    Java基础 xff1a 反射机制 什么是反射Java反射机制的作用Java反射机制如何禁止Java反射机制的应用Java反射机制 常用 Api通过反射机制手写 spring ioc 示例 什么是SpringIOC什么是SpringIOC底
  • Java之设计模式(一): 单例模式

    单例模式 什么是单例模式 xff1f 单例模式应用场景饿汉式懒汉式静态内部类总结 设计模式 xff08 Design pattern xff09 是一套被反复使用 多数人知晓的 经过分类编目的 代码设计经验的总结 使用设计模式是为了可重用代
  • Java内存结构:基本概念

    基本概念 Java内存结构方法区 xff1a 堆 xff1a 栈 xff1a 内地方法栈 xff1a PC寄存器 xff1a 执行引擎 xff1a 垃圾回收机制 前言 面试经常被问到JAVA内存模型和Java内存结构的区别 JAVA内存模型
  • Java内存结构:jvm调优 堆调优

    堆 什么是堆堆结构图堆的参数配置设置最大堆内存设置新生代与老年代优化参数设置新生代比例参数内存溢出解决办法 什么是堆 是new创建出来的对象或数组存放在堆中 堆结构图 堆的参数配置 XX 43 PrintGC 每次触发GC的时候打印相关日志
  • springboot 2.3之后消失的hibernate-validator

    spirngboot升级到2 3之后 xff0c hibernate validator消失 项目升级到springboot2 3之后 xff0c 参数校验的注解报错 xff0c 发现spring boot starter web的依赖项已
  • CentOS7.6下MySQL8.0 tar.xz 安装详解

    安装详解 环境准备下载安装包安装步骤创建mysql用户更改目录权限初始化mysql设置环境变量添加开机自启动启动MYSQL服务登陆MYSQL修改密码修改公网访问设置防火墙 环境准备 1 由于Centos7自带Mariadb xff0c 卸载
  • Linux 防火墙 常用命令 基于Centos7

    常用操作 firewalld的基本使用服务之间的常用命令配置firewalld cmd firewalld的基本使用 启动 xff1a systemctl start firewalld 关闭 xff1a systemctl stop fi
  • MySQL主从复制和读写分离

    MySQL主从复制和读写分离 MySQL主从复制的好处主从复制的概念与作用主从复制的原理主从复制与读写分离的关系主从复制的配置修改主 master 服务器主服务器给从服务器账号授权修改从 slave 服务器连接主服务器开始同步操作 MySQ
  • java:组织机构树实现名称模糊查询

    在开发的很多场景中 xff0c 需要我们对树形结构进行模糊查询 xff0c 这个时候就需要用到递归来实现定位 xff0c 效果如下图 xff1a 话不多说 xff0c 直接上代码吧 xff1a 首先是对json的实例化对象处理 public
  • openstack dns_使用OpenStack Designate构建DNS即服务

    openstack dns Designate是一个多租户DNS即服务 xff0c 包括用于域和记录管理的REST API xff0c 用于与Neutron集成的框架以及对Bind9的集成支持 您可能需要考虑以下DNSaaS xff1a 干
  • mybatis plus 之UpdateWrapper操作

    目前java开发的系统为了可移植性更高 xff0c 都采用了mybatis plus来做持久层 xff0c 现在介绍一下其中UpdateWrapper的用法 xff1a UpdateWrapper lt TemplateDetailsEnt
  • 阿里域名+腾讯云服务器组合部署网站

    前段时间 xff0c 我在阿里云上购买了一个域名 本想着也在阿里云上购买一台入门级服务器先玩玩 xff0c 但是在看完一系列的产品后 xff0c 还是放弃了阿里云的服务器 对比几家之后还是选择了腾讯云 xff0c 接下来 xff0c 就是踩
  • 解决spring security登出时会由https重定向为http

    我们在集成spring security框架时 xff0c 默认的登出会走一次重定向login logout xff0c 这是如果发布到生产环境 xff0c 则会登出有问题 具体报错就是说由https和http混用访问资源的错误 xff0c
  • docker安装kafka,并集成springboot进行测试

    大家好 xff0c 今天我们开始学习kafka中间件 xff0c 今天我们改变一下策略 xff0c 不刷视频学习 xff0c 改为实践学习 xff0c 在网上找一些案例功能去做 xff0c 来达到学习实践的目的 首先 xff0c 是安装相关
  • springboot集成kafka的相关配置及自定义

    之前的文章末尾 xff0c 简单的实现了springboot集成kafka xff0c 完成了简单的测试 xff0c 今天我们来扩展一下相关内容 首先详解一下配置文件的内容 xff1a spring kafka 指定 kafka 地址 xf