Apche Kafka + Spring的消息监听容器

2023-11-10

一、消息的接收

消息的接收:可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener注释来接收消息。本章我们主要说明通过配置MessageListenerContainer并提供消息侦听器的方式接收消息。

1.1、消息监听器

当使用消息监听容器时,就必须提供一个监听器来接收数据。目前有八个支持消息侦听器的接口:

public interface MessageListener<K, V> { 
     // 当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
    void onMessage(ConsumerRecord<K, V> data);
}

public interface AcknowledgingMessageListener<K, V> { 
    // 当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    // 当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    //当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。提供对 Consumer 对象的访问。
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { 
   //当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。
    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 
    // 当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    // 当使用自动提交或容器管理的提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。提供对 Consumer 对象的访问。
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    //当使用手动提交方法之一时,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。提供对 Consumer 对象的访问。
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

注意:1、 Consumer对象不是线程安全的;2、不应执行任何Consumer<?, ?>影响消费者位置和/或监听器中已提交偏移量的方法;容器需要管理这些信息。

二、消息监听容器

2.1、 实现方法

MessageListenerContainer 提供了两种实现方式 :
1、KafkaMessageListenerContainer,
2、ConcurrentMessageListenerContainer

2.1.1、KafkaMessageListenerContainer

2.1.1.1、 基本概念

KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。委托ConcurrentMessageListenerContainer给一个或多个KafkaMessageListenerContainer实例以提供多线程消费。

  • 从2.2.7版本开始,可以添加一个记录拦截器(RecordInterceptor)监听器容器;它将在调用侦听器之前调用,以允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。
  • 从版本 2.7 开始,它具有在侦听器退出后(通常或通过抛出异常)调用的附加方法。
  • 批处理拦截器(BatchInterceptor)为批量监听器(Batch Listeners)提供类似的功能。
  • 此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。 例如,这可以用于访问拦截器中的消费者指标。
  • CompositeRecordInterceptor and CompositeBatchInterceptor可以调用多个拦截器。
  • 默认情况下,当使用事务时,拦截器在事务启动后被调用。从版本 2.3.4 开始,可以设置侦听器容器的 interceptBeforeTx 属性在事务开始之前调用拦截器。
  • 从版本 2.3.8、2.4.6 开始,当并发大于 1 时 ConcurrentMessageListenerContainer 支持静态成员资格。 group.instance.id 后缀为 -n ,起始n于1。这与增加 session.timeout.ms 的值 一起可用于减少重新平衡事件,例如,当应用程序实例重新启动时。
  • 静态成员资格是指在提高流应用程序、消费者组和其他构建在组再平衡协议之上的应用程序的可用性。再平衡协议依赖组协调器为组成员分配实体 ID。这些生成的 ID 是短暂的,并且会在成员重新启动和重新加入时发生变化。对于基于消费者的应用程序,这种“动态成员资格”可能会导致在管理操作(例如代码部署、配置更新和定期重新启动)期间将大部分任务重新分配给不同的实例。对于大型状态应用程序,洗牌任务在处理之前需要很长时间才能恢复其本地状态,从而导致应用程序部分或完全不可用。受这一观察的启发,Kafka 的组管理协议允许组成员提供持久的实体 ID。根据这些 ID,组成员资格保持不变,因此不会触发重新平衡。

同样的,拦截器中不应该执行任何影响消费者的位置和/或提交的偏移量的方法,容器需要管理这些信息。

如果拦截器改变了记录(通过创建新记录),则topic、partition和offset必须保持不变,以避免意外的副作用,例如记录丢失。

2.1.1.2、如何使用 KafkaMessageListenerContainer

  • KafkaMessageListenerContainer 构造函数

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                      ContainerProperties containerProperties)
    

    该构造函数接收接收消费者工厂(ConsumerFactory)有关对象中主题和分区以及其他配置的信息。

  • 容器属性(ContainerProperties)包含3个构造函数,下面我们一个一个介绍它们。
    1、以TopicPartitionOffset为参数

    public ContainerProperties(TopicPartitionOffset... topicPartitions)
    

    该构造函数采用一个主题分区偏移量(TopicPartitionOffset)参数数组来显式指示容器要使用哪些分区(使用消费者assign()方法)并带有可选的初始偏移量。默认情况下,正值是绝对偏移量,负值是相对于分区内当前最后一个偏移量。TopicPartitionOffset提供了一个带有附加参数的构造函,boolean如果是true,则在容器启动时相对于该消费者的当前位置初始偏移(正或负)。
    2、以String为参数

    public ContainerProperties(String... topics)
    

    该构造函数采用主题数组,Kafka 根据属性分配分区group.id——在组中分配分区
    3、以Pattern为参数

    public ContainerProperties(Pattern topicPattern)
    

    该构造函数使用正则表达式Pattern来选择主题。

  • 如何将监听器分配给容器
    监听器有了容器也有了,如何将监听器分配给容器呢?。要将 MessageListener 分配给容器,可以在创建 Container 时使用 ContainerProps.setMessageListener 方法:

    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    containerProps.setMessageListener(new MessageListener<Integer, String>() {
      ...
    });
    DefaultKafkaConsumerFactory<Integer, String> cf =
                          new DefaultKafkaConsumerFactory<>(consumerProps());
    KafkaMessageListenerContainer<Integer, String> container =
                          new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
    

    要注意的是,在创建 DefaultKafkaConsumerFactory 时,使用仅接受上述属性的构造函数意味着从配置中选取键和值反序列化器类。 或者,反序列化器实例可以传递到 DefaultKafkaConsumerFactory 构造函数以获取键和/或值,在这种情况下,所有消费者共享相同的实例。 另一种选择是提供Supplier(从版本2.3开始),它将用于为每个消费者获取单独的Deserializer实例:

     DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                          new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new      CustomValueDeserializer());
     KafkaMessageListenerContainer<Integer, String> container =
                          new KafkaMessageListenerContainer<>(cf, containerProps);
    return container;
    

从版本 2.3.5 开始,引入了一个名为authorizationExceptionRetryInterval 的新容器属性。 这会导致容器在从 KafkaConsumer 获取任何 AuthorizationException 后重试获取消息。 例如,当配置的用户被拒绝读取特定主题时,就会发生这种情况。 定义authorizationExceptionRetryInterval应该有助于应用程序在授予适当的权限后立即恢复。

2.1.2、ConcurrentMessageListenerContainer

ConcurrentMessageListenerContainer只有一个构造函数与构造函数类似 KafkaListenerContainer。

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它有一个concurrency属性,这个属性的作用是创建几个 KafkaMessageListenerContainer 实例。例如:container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。

当监听多个主题时,默认的分区分布可能不是我们所期望的。 例如,如果有 3 个主题,每个主题有 5 个分区,并且我们想要使用 concurrency=15,但是我们只会看到 5 个活动使用者,每个使用者从每个主题分配一个分区,而其他 10 个使用者处于空闲状态。 这是因为默认的 Kafka PartitionAssignor 是 RangeAssignor。 对于这种情况,我们需要考虑使用 RoundRobinAssignor,它将分区分配给所有使用者。 然后,为每个消费者分配一个主题或分区。 我们可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者属性来更改要更改PartitionAssignor。(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。
在springboot中可以这样:
spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor

当使用 TopicPartitionOffset 配置容器属性时,ConcurrentMessageListenerContainer 会在委托 KafkaMessageListenerContainer 实例之间分发 TopicPartitionOffset 实例。

假设提供了 6 个 TopicPartitionOffset 实例,并发度为 3; 每个容器有两个分区。 对于五个
TopicPartitionOffset 实例,两个容器获得两个分区,第三个容器获得一个分区。 如果并发数大于TopicPartition的数量,则降低并发数,使每个容器获得一个分区。

三、偏移

spring提供了几个偏移选项, 如果 enable.auto.commit 消费者属性为 true,Kafka会根据其配置自动提交偏移量。 如果为 false,则容器支持多种 AckMode 设置。 默认 AckMode 为 BATCH。

从版本 2.3 开始,框架将 enable.auto.commit 设置为 false,除非在配置中明确设置。以前,如果未设置该属性,则使用 Kafka 默认值 (true)。

消费者 poll() 方法返回一个或多个 ConsumerRecord。 为每条记录调用 MessageListener。 以下列表描述了容器对每个 AckMode 采取的操作(当未使用事务时):

  • RECORD:当侦听器处理记录后返回时提交偏移量。

  • BATCH:当 poll() 返回的所有记录都已处理完毕时提交偏移量。

  • TIME:当 poll() 返回的所有记录都处理完毕后,只要超过了自上次提交以来的 ackTime,就提交偏移量。

  • COUNT:当 poll() 返回的所有记录都已处理完毕时,提交偏移量,只要自上次提交以来已收到 ackCount 条记录。

  • COUNT_TIME:与 TIME 和 COUNT 类似,但如果任一条件为真,则执行提交。

  • MANUAL:消息侦听器负责acknowledge() 确认。 之后,应用与 BATCH 相同的语义。

  • MANUAL_IMMEDIATE:当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

使用事务(transactions)时,偏移量将发送到事务,语义相当于 RECORD 或 BATCH,具体取决于侦听器类型(记录或批处理)。MANUAL 和 MANUAL_IMMEDIATE 要求侦听器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。

根据syncCommits容器属性,使用消费者上的commitSync()或commitAsync()方法。 默认情况下,syncCommits 为 true。

作者个人建议建议设置:ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为 false。

从版本 2.3 开始,Acknowledgment 接口增加了两个方法 nack(long sleep) 和 nack(int index, long sleep)。 第一个与记录侦听器一起使用,第二个与批处理侦听器一起使用。 为侦听器类型调用错误的方法将引发 IllegalStateException。在此之前他是这样:

public interface Acknowledgment {

    void acknowledge();

}
  • 如果要提交部分批次,使用 nack()。
  • 使用事务时,将 AckMode 设置为 MANUAL;
  • 调用 nack() 会将成功处理的记录的偏移量发送到事务。
  • nack() 只能在调用侦听器的消费者线程上调用。
  • 当调用 nack() 时,将提交所有挂起的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次轮询时重新传递失败的记录和未处理的记录( )。
  • 通过设置 sleep 参数,消费者线程可以在重新交付之前暂停。 这与在容器配置了 SeekToCurrentErrorHandler 时抛出异常的功能类似。

当通过组管理使用分区分配时,确保 sleep 参数(加上处理先前轮询的记录所花费的时间)小于使用者 max.poll.interval.ms属性,这个非常重要

四、监听器容器自动启动

侦听器容器实现 SmartLifecycle,并且 autoStartup 默认为 true。 容器在后期启动 (Integer.MAX-VALUE - 100)。 实现 SmartLifecycle 来处理来自侦听器的数据的其他组件应在早期阶段启动。 -100 为后续阶段留出了空间,使组件能够在容器之后自动启动。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apche Kafka + Spring的消息监听容器 的相关文章

随机推荐

  • 透视Matplotlib核心功能和工具包 - Seaborn工具包

    Seaborn是基于Matplotlib构建的功能强大的可视化工具 它使多变量探索性数据分析更加容易和直观 并且增加了一些新类型的图 并且其背景样式和颜色图更加令人愉悦 它具有许多内置的统计功能 使其成为统计数据分析的首选工具 它还具有非常
  • Web学习之TypeScript

    文章目录 一 TypeScript是什么 二 TypeScript配置 三 变量声明 四 解构 五 函数 六 类Class 七 模块Module 八 总结 九 学习资料 一 TypeScript是什么 TypeScript是JavaScri
  • PTA 7-38 等边三角形面积

    PTA 7 38 等边三角形面积 数学基础对于程序设计能力而言很重要 对于等边三角形面积 请选择合适的方法计算之 输入格式 测试数据有多组 处理到文件尾 每组测试输入1个实数表示等边三角形的边长 输出格式 对于每组测试 在一行上输出等边三角
  • Ubuntu 14.04 Tab补全忽略大小写

    0 前言 当目录名以大写字母开头时 通过cd命令进入该目录就不太方便 需要切换到大写输入 如果Tab补全可以忽略大小写的话 这个问题就引刃而解 1 设置方法 1 在 目录中创建 inputrc 2 打开 inputrc 添加如下设置 set
  • 面向产品分析的内容

    声明 本文是学习GB T 42859 2023 航天产品质量问题三个面向分析方法实施要求 而整理的学习笔记 分享出来希望更多人受益 如果存在侵权请及时联系我们 1 范围 本文件规定了航天产品质量问题三个面向分析方法实施的一般要求 程序和分析
  • Pandas库基础知识(一)

    文章目录 1 数据结构 1 Series 数据结构 1 Series对象的创建 2 Series对象的属性 3 Series对象的基本操作 2 DataFrame 数据结构 1 DataFrame对象的创建 2 DataFrame对象的属性
  • scrapy-redis分布式爬虫框架详解

    scrapy redis分布式爬虫框架详解 随着互联网技术的发展与应用的普及 网络作为信息的载体 已经成为社会大众参与社会生活的一种重要信息渠道 由于互联网是开放的 每个人都可以在网络上发表信息 内容涉及各个方面 小到心情日志 大到国家大事
  • sqllab 1-6 练习

    前言 什么是sql注入 攻击者通过构造不同的sql语句来实现对数据库的操作 两个关键 参数用户可控 参数带入数据库查询 基本流程 判断注入点 判断字段数 判断回显点 查询相关内容 判断库名 gt 判断表明 gt 判断列名 gt 判断数据 搭
  • xxl-job详细使用指南

    新建任务说明 本篇文章承接上文 xxl job快速入门指南 上一次和大家简单介绍了下 xxl job 的由来以及使用方法 本篇文章将会详细介绍一些高级使用方法及特性 上文中我们在新建一个任务的时候发现有很多的选项 现在我们来详细聊一聊他们的
  • Jquery加载本地文件出现跨域错误的解决方案

    禁止跨域是浏览器的安全限制机制 会报告上述错误 但是可以通过设置来绕过这个限制 如果经常调试前端代码 可以在本机装个web容器 常见的方式 右击chrome快捷方式 选择 属性 在 快捷方式 下的 目标 中添加 allow file acc
  • HT for Web (Hightopo) 使用心得(6)- 3D场景环境配置(天空球,雾化,辉光,景深)

    在前一篇文章 Hightopo 使用心得 5 动画的实现 中 我们将一个直升机模型放到了3D场景中 同时 还利用动画实现了让该直升机围绕山体巡逻 在这篇文章中 我们将对上一篇的场景进行一些环境上的丰富与美化 让场景更真实一些 具体涉及到的知
  • 微信小程序基础入门的知识点

    微信小程序基础入门的知识点 1 窗口配置 就是在我们app json文件就是对我们微信小程序进行全局配置 它决定我们页面文件的路径 设置多个tab 1 1 pages设置页面的路径 数组的第一个就是我们小程序初始页面 文件名不需要我们写文件
  • C语言入门之自定义结构体数据struct Student { int num; char name[20]; char sex; int age; 类型

    用户自己建立由不同类型数据组成的组合型的数据结构 它称为结构体 例如 一个学生的学号 姓名 性别 年龄 成绩 家庭地址等项 是属于同一个学生的 因此组成一个组合数据 如student 1的变量 反映它们之间的内在联系 struct Stud
  • 字节跳动,华为,阿里巴巴,小米,腾讯 2021大厂面试经历系列之初、中、高级测试工程师面试题汇总(附答案)

    前言 最近看到很多人都在找工作 而且很多人都感觉今年找工作比去年难很多 竞争力也增加不少 因此激发我整理这份资料 希望能帮到正在找或者准备找工作的童鞋们 首先我们能否获得一个面试机会 那肯定是从简历开始 简历需要做好功夫 一份好的简历才足够
  • Springboot集成Jasypt实现配置文件加密

    不容错过的成长之旅 Jasypt介绍 Jasypt是一个java库 它允许开发员以最少的努力为他 她的项目添加基本的加密功能 并且不需要对加密工作原理有深入的了解 用于单向和双向加密的高安全性 基于标准的加密技术 加密密码 文本 数字 二进
  • 解决Windows10中文用户名带来软件无法打开的影响

    众所周知 许多国外的开发软件都不支持中文的文件路径名 即使软件的路径无中文字符 可你系统用户的名称是中文 同样软件无法运行 因为大部分软件的在电脑用户上的缓冲文件都是在 C user 你的用户名称 AppData local temp 解决
  • Power BI和Tableau对比分析,到底要学哪个?

    Power BI和Tableau对比分析 到底要学哪个 一 两个工具优缺点 Power BI Tableau 二 职业需求 前程无忧 智联招聘 三 总结 学习tableau还是power bi想必是很多初学者的疑惑 可以从以下两个角度去考虑
  • 一台服务器想用150个ip,可以吗?

    不可以 一台服务器一次只支持一个ip 可以更换 但每次绑定一个 如果需要多个ip 只能多开服务器 挂店铺的 包括淘宝店铺 拼多多店铺 亚马逊店铺都想店铺不被关联 想买台服务器 可以的 一个店铺买一台服务器 有几个店铺就买几台服务器 ip的带
  • 数据结构:(代码篇:004)二叉搜索树 BST(二叉查找树)的基本操作

    1 基本概念 二叉搜索树 是二叉树是真子集 要求 根节点大于或等于左子树的节点值 小于右子树的节点值 他的中序遍历是有序的 基本操作有 查找 插入 删除 建树 查找和插入以及建树都很简单 和二叉树的差不多 删除略微复杂 2 存储结构 str
  • Apche Kafka + Spring的消息监听容器

    目录 一 消息的接收 1 1 消息监听器 二 消息监听容器 2 1 实现方法 2 1 1 KafkaMessageListenerContainer 2 1 1 1 基本概念 2 1 1 2 如何使用 KafkaMessageListene