Spring Boot 中使用@KafkaListener并发批量接收消息

2023-05-16

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用)。

完整的代码在这里,欢迎加星号、fork。

官方文档在https://docs.spring.io/spring-kafka/reference/html/_reference.html

###第一步,并发消费###
先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

###第二步,批量消费###
然后是批量消费。重点是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

启动日志截图
这里写图片描述

关于max.poll.records和max.poll.interval.ms官方解释截图:
这里写图片描述

###第三步,分区消费###
对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

关于分区和消费者关系,后面会补充,先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,总结,如果我们的topic有多个分区,经过以上步骤可以很好的加快消息消费。如果只有一个分区,因为已经有一个同名group id在消费了,新启动的一个基本上没有作用(本人测试结果)。

具体代码在这里,欢迎加星号,fork。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Boot 中使用@KafkaListener并发批量接收消息 的相关文章

  • 计算器1.0

    数据结构让我们相遇 计算器1 0正式上线 xff1a define h span class token macro property span class token directive keyword include span span
  • 常用软件滤波方法及其示例程序

    常用软件滤波方法及其示例程序 作者 未知 来源 发布时间 2005 08 24 浏览次数 lt script language 61 34 Javascript 34 src 61 34 view php articleid 61 10 3
  • I2C协议调试总结

    1 协议总结 开始 xff1a 在SCL为高期间 xff0c SDA由高变低 结束 xff1a 在SCL为高期间 xff0c SDA由低变高 应答 xff1a 时钟脉冲 xff19 期间释放数据线 xff0c SCL为高期间 xff0c S
  • mtk6765上i2c-tools的使用

    1 下载i2c tools 从开源网站http dl lm sensors org i2c tools releases 下载i2c tools 几个版本都可以用 xff0c 可以选择下载i2c tools 4 3 xff12 到官网下载交
  • mt6762/mt6765平台i2c驱动能力修改与波形优化

    mt6762在连接一些外设时 xff0c i2c在低电平会拉不低的情况 xff0c 最低电平在0 4V左右 xff0e 这时需要去掉外设的上拉电阻或调整硬件i2c的驱动电流 xff0e 修改文件路径 xff1a vendor mediate
  • cmake find_package 原理简介以及使用说明

    下面简单介绍Cmake 如何使用find package命令对外部库进行查找 cmake本身不提供任何关于搜索库的便捷方法 xff0c 也不会对库本身的环境变量进行设置 它仅仅是按照优先级顺序在指定的搜索路径进行查找Findxxx cmak
  • MT6762平台NXP NFC代码移植要点

    xff11 找到官网 xff0c 获取源码包 NXP NFC移植源码 https github com NXPNFCProject 固件 https github com NXP nfc NXPNFCC FW tags 2 主要介绍源码移植
  • mt6762添加gpio按键方法与问题调试

    mt6762添加gpio按键方法与问题调试 1 配置dts文件 xff0e 定义gpio87为拍照键 对于keycode 61 212 gpio keys gpio keys compatible 61 34 gpio keys 34 in
  • GD33F303RTC6串口USART0重映射问题

    GD32F303RTC6的串口有多个 xff0c 其中USART0串口可以定义为PA9 PA10 xff0c 也可以映射定义为PB6 PB7 USART0默认是用PA9 PA10 xff0c 如果需要映射到PB6 PB7 则需要gpio p
  • APM32F103CBT6调试接口复用为GPIO配置问题

    APM32F103CBT6调试接口复用为GPIO配置问题 xff1a 可以通过调整复用配置 xff0c 改为GPIO口 xff0c xff08 注意 xff1a 改为上电默认为GPIO口后 xff0c 将无法再用调试功能 xff0c 慎重选
  • APM32F103 USB键盘如何唤醒PC机

    USB2 0全速设备的技术规范 xff0c 所以支持远程唤醒的功能 xff0c 本文简单介绍全速USB模式下 xff0c USB键盘如何休眠PC xff0c 唤醒PC机 描述符配置 APM32F103鼠标设备配置描述符源代码如下 xff0c
  • 搭建机器人电控系统——如何从零开发主控?——编译环境的选择KEIL、IAR、STM32CubeMX、ROS

    搭建机器人电控系统 如何从零开发主控 xff1f 编译环境的选择 主控的从零开发 编译环境的选择 我以我目前接触到的编译环境为例子 xff0c 目前机器人主控的编译环境一般有三种 xff1a KEIL STM32CubeMX IAR LIN
  • 计算机视觉实验三-全景图像拼接

    目录 一 图像映射与全景拼接 1 1 简介 1 2基础流程 1 3计算图像之间的变换结构 1 4图像拼接的几何原理 编辑 1 5变换类型选择 1 6 2D 图像变换原理 1 7 图像映射流程 1 8 图像拼接整体流程 二 全景拼接测试 2
  • 倒立摆系统

    倒立摆系统是典型的多变量 xff0c 非线性 xff0c 自不稳定和强耦合不确定系统 它可以有效反应控制过程中的许多关键问题 xff0c 是测试各种控制理论的理想模型 以倒立摆为控制对象 xff0c 研究者们已对各种控制理论进行了验证 xf
  • FREERTOS使用任务通知和队列进行串口实时通信实例

    1 xff0c 帧间隔小于10ms的批量数据刷新 采用接收中断 43 空闲中断配合任务通知实现定时处理 2 xff0c 批量大数据传输 采用接收中断 43 队列方式实现接收 3 xff0c 固定帧格式慢速数据 xff0c 采用空闲中断 43
  • ROS+openCV图像处理方法及案例

    本文内容环境Ubuntu20 04 首先 xff0c 我们需要安装openCV sudo apt get install ros noetic vision opencv libopencv dev python3 opencv 然后需要安
  • ubuntu c++使用eigen提示"fatal error: Eigen/Dense: No such file or directory"的解决办法

    include lt Eigen Dense gt 记得命名安装了eigen库啊 为啥提示找不到库文件啊 xff1f 初步怀疑环境的配置问题 xff0e 查到到eigen的安装路径为 xff1a usr include eigen3 Eig
  • C++学习之路(一):搭建C++开发环境

    文章目录 前言一 Windows平台下C 43 43 环境搭建1 1 Visual Studio安装1 2 创建C 43 43 项目 二 Linux平台下C 43 43 环境搭建2 1 安装编译工具2 2 cmake脚本编写 结束 前言 在
  • 为什么要使用事件而不是委托变量?

    用委托变量会让客户端Main方法里可以直接调用 用事件则提高了封装性 class Program static void Main string args Pub p 61 new Pub Sub s 61 new Sub p Number
  • FreeRTOS原理,在STM32下完成一个基于FreeRTOS的多任务程序

    一 学习FreeROTS 1 Free 即免费的 xff0c RTOS 全称是 Real Time Operating System xff0c 中文就是实时操作系统 注意 xff0c RTOS 不是指某一个确定的系统 xff0c 而是指一

随机推荐