springboot2.x +kafka使用和源码分析九(KafkaListenerEndpointRegistry暂停启动容器)

2023-05-16

我们在运行中如果需要暂停启动容器时可以通过此类KafkaListenerEndpointRegistry来处理。

KafkaListenerEndpointRegistry源码(只解释了核心代码):


public class KafkaListenerEndpointRegistry implements
        //当spring销毁bean时执行操作
        DisposableBean,
        //在Spring加载和初始化所有bean后开启异步线程来管理容器
        SmartLifecycle,
        //注入ApplicationContext对象
        ApplicationContextAware,
        //(spring观察者模式体现)
        //监听ContextRefreshedEvent事件
        ApplicationListener<ContextRefreshedEvent>  {

    //存放所有listenerContainers线程安全容器
    private final Map<String, MessageListenerContainer> listenerContainers =
            new ConcurrentHashMap<String, MessageListenerContainer>();

    private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;

    //applicationContext 对象
    private ConfigurableApplicationContext applicationContext;

    //applicationContext 刷新标志位
    private boolean contextRefreshed;

    //判断该线程是否运行
    private volatile boolean running;


    //获取spring容器中bean id的ListenerContainer
    public MessageListenerContainer getListenerContainer(String id) {
        .....
    }

    //获取所有容器ids
    public Set<String> getListenerContainerIds() {
        ......
    }

    //获取托管容器的集合 但不包含被声明为Bean的容器
    public Collection<MessageListenerContainer> getListenerContainers() {
        .....
    }

    //获取所有容器的集合 包括由注册表管理的容器和声明为bean的容器
    public Collection<MessageListenerContainer> getAllListenerContainers() {
        .....
    }

    //注册新的监听容器
    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    }


    //DisposableBean  接口中实现方法 当KafkaListenerEndpointRegistry被销毁时 销毁所有容器
    public void destroy() {
        .....
    }

    //当spring中有ContextRefreshedEvent事件产生 执行
    public void onApplicationEvent(ContextRefreshedEvent event) {
	    .....
	}


    //启动所有MessageListenerContainer容器
    public void start() {
       .....
    }


    //停止所有MessageListenerContainer容器
    public void stop() {
        .....
    }
    
}

测试代码 :

@SpringBootApplication
//ApplicationListener 监听KafkaEvent事件
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    //接收KafkaEvent事件 
    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            //当容器.pause()是会触发ConsumerPausedEvent事件
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            //当当容器.pause()是会触发ConsumerResumedEvent事件
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(3)
            .replicas(1)
            .build();
    }

}

备注:对上述描述中 ApplicationListener 可参考后续发布的spring提供的事件机制

测试结果:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

springboot2.x +kafka使用和源码分析九(KafkaListenerEndpointRegistry暂停启动容器) 的相关文章

  • Springboot2(23)轻松整合shiro(带验证码)

    源码地址 springboot2教程系列 Shiro配置 1 Spring集成Shiro一般通过xml配置 SpringBoot集成Shiro一般通过java代码配合 Configuration和 Bean配置 2 Shiro的核心通过过滤
  • Spring Boot Kafka - 序列化和反序列化JSON

    文章目录 Spring Boot Kafka 序列化和反序列化JSON 前言 配置JsonSerializer和JsonDeserializer 定义一个Model类 Producer类 Consumer类 Controller类 测试 小
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    一 案例说明 现有一电商网站数据文件 名为buyer favorite1 记录了用户对商品的收藏数据 数据以 t 键分割 数据内容及数据格式如下 二 前置准备工作 项目环境说明 Linux Ubuntu 16 04 jdk 7u75 lin
  • MQ如何保证消息不丢失

    如何保证消息不丢失 哪些环节会造成消息丢失 其实主要就是跨网络的环境中需要考虑消息的丢失 主要是有以下几个方面 生产者往MQ发送消息 MQ的Broker是集群有主从的 主节点把消息同步到从节点时也需要考虑消息丢失问题 消息从内存持久化到硬盘
  • Kafka【命令行操作】

    Kafka 命令行操作 Kafka 主要包括三大部分 生产者 主题分区节点 消费者 1 Topic 命令行操作 也就是我们 kafka 下的脚本 kafka topics sh 的相关操作 常用命令行操作 参数 描述 bootstrap s
  • Kafka原理分析

    在基础篇中我们介绍MQ的一些基础原理 这篇文章 我们针对kafka进行较深入的分析 上篇文章中我们提到了kafka中一个名词broker 其实broker可以理解成为一台kafa服务器 kafka的特性和功能 在kafka设计之初是为了实时
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • 基于Spark的电商用户行为实时分析可视化系统(Flask-SocketIO)

    基于Spark的电商用户行为实时分析可视化系统 Flask SocketIO 项目简介 该项目已上线蓝桥课程 有需要的可凭邀请码 UB5mdLbl 学习哦 有优惠 课程地址 https www lanqiao cn courses 2629
  • Kafka剖析(一):Kafka背景及架构介绍

    转载自 http www infoq com cn articles kafka analysis part 1 Kafka 是由 LinkedIn 开发的一个分布式的消息系统 使用 Scala 编写 它以可水平扩展和高吞吐率而被广泛使用
  • Kafka一文懂

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的 它是一个分布式的 支持多分区 多副本 基于 Zookeeper 的分布式消息流平台 它同时也是一款开源的基于发布订阅模式的消息引擎系统 Kafka 的基本
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • kafka 监控工具--CMAK

    CMAK previously known as Kafka Manager is a tool for managing Apache Kafka clusters See below for details about the name
  • kafka(三)重平衡

    历史文章 kafka 一 kafka的基础与常用配置 文章目录 一 kafka消费者组 二 重平衡 Rebalance 2 1 重平衡触发条件 2 2 重平衡策略 2 2 1 Range 平均分配 2 2 2 RoundRobin 轮询分配
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • 在windows系统下使用IDEA对kafka源码进行编译环境搭建以及配置

    目录 一 前期准备工作 step1 安装JDK1 8 step2 安装zookeeper单机版 step3 安装Gradle 5 4 step4 安装scala 2 11 12 二 将kafka源代码部署到编辑器IDEA并测试 step1
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De

随机推荐

  • Mysql环境变量配置

    一 mysql的环境变量配置步骤 1 1 在桌面选择 计算机 的图标 xff0c 右键 gt 属性 gt 点击 高级系统设置 gt 点击 环境变量 2 2 新建MYSQL HOME变量 xff0c 并将值设置为C Program Files
  • MySQL安装配置教程(超级详细)

    一 下载MySQL Mysql官网下载地址 xff1a MySQL Download MySQL Installer Archived Versions 1 选择要安装的版本 xff0c 本篇文章选择的是5 7 31版本 xff0c 点击D
  • mysql字符切割的四种方式

    1 从左开始截取字符串 left xff08 str length xff09 说明 xff1a left xff08 被截取字段 xff0c 截取长度 xff09 select left 39 如果暴力不是为了杀戮 xff0c 那将变得毫
  • Podman 使用指南

    原文链接 xff1a Podman 使用指南 Podman 原来是 CRI O 项目的一部分 xff0c 后来被分离成一个单独的项目叫 libpod Podman 的使用体验和 Docker 类似 xff0c 不同的是 Podman 没有
  • ClickHouse查询语句详解

    ClickHouse查询语句兼容大部分SQL语法 xff0c 并且进行了更加丰富的扩展 xff0c 查询语句模板如下 xff1a WITH expr list subquery SELECT DISTINCT ON column1 colu
  • Mysql和Redis如何保证数据一致性

    文章目录 前言一 先更新数据库 xff0c 再更新redis二 先更新redis xff0c 在更新数据库三 先更新数据库 xff0c 再删除redis四 先删除redis xff0c 再更新数据库总结 前言 如何保证数据库和缓存双写一致
  • SQL——左连接(Left join)、右连接(Right join)、内连接(Inner join)

    文章目录 前言 一 概念 二 例子 总结 前言 最近在做SQL相关的练习 发现以前那么自信的SQL放久了不碰也变得棘手起来 特别是这一块表之间的内外连接 所以这篇是关于这个内外连接的整理 一 概念 首先还是介绍一下这三个的定义 1 Left
  • UCOSII之项目实战总结

    电子IT行业博大精深 xff0c 没有人能够用笔记本天天记录自己所学的知识 xff0c 于是乎 xff0c 撰写博客就成了每个 IT民工 的专长 再者 xff0c 写一篇博客 xff0c 其意义与不但记录了自己所需的知识 xff0c 更提高
  • 明白了一句话:“加速度信号对高频敏感,位移信号对低频敏感”

    以前听别人说这些 xff0c 然后记住了 但是一直不大理解 最近在调试IEPE传感器 xff0c 正好要算位移 速度 加速度 对于相同的速度 xff0c 频率越高 xff0c 加速度值就越大 因为从公式就能看出来 xff0c 对于固定频率的
  • ubuntu 16.04使用IntelRealSense D435i调用realsense ROS包时,报symbol lookup error和undefined symbol错误的解决办法

    在ubuntu 16 04使用IntelRealSense D435i调用realsense ROS包时 xff0c 运行 roscore roslaunch realsense2 camera rs rgbd launch 出现错误 xf
  • Android浪潮

    Google的Android手机就要席卷世界了 xff01 IT技术的发展常常太出人意料 xff0c 我也想不太清楚Google的Android平台究竟吸引人在哪里 xff0c 但我相信Android会很快改变手机平台的格局 新的形势会出人
  • 卡尔曼滤波相关介绍及优缺点

    1 卡尔曼滤波算法为什么会叫滤波算法 xff1f 以一维卡尔曼滤波为例 xff0c 如果我们单纯的相信测量的信号 xff0c 那么这个信号是包含噪声的 xff0c 是很毛糙的 xff0c 但是当我们运行卡尔曼滤波算法去做估计 xff0c 我
  • STM32单片机(五)-寄存器地址理解和控制LED闪烁

    芯片 xff1a stm32f103zet6 1 存储单元一般应具有存储数据和读写数据的功能 一般以8位二进制作为一个存储单元 也就是一个字节 每个单元有一个地址 是一个整数编码 可以表示为二进制整数 2 stm32是32位单片机 xff0
  • 跨平台构建 Docker 镜像新姿势,x86、arm 一把梭

    点击 34 阅读原文 34 可以获得更好的阅读体验 在工作和生活中 xff0c 我们可能经常需要将某个程序跑在不同的 CPU 架构上 xff0c 比如让某些不可描述的软件运行在树莓派或嵌入式路由器设备上 特别是 Docker 席卷全球之后
  • 正点原子STM32F4笔记

    使用寄存器操作 xff0c 不错的博客 xff1a https blog csdn net w471176877 article category 1230060 https blog csdn net w471176877 article
  • JAVA中this用法小结

    我知道很多朋友都和我一样 xff1a 在 JAVA 程序中似乎经常见到 this xff0c 自己也偶尔用到它 xff0c 但是到底 this 该怎么用 xff0c 却心中无数 xff01 很多人一提起它 xff0c 就说 当前对象 xff
  • Linux Platform总线+SPI总线分析

    2015 07 1 11 20 本文以MPC8308 powerpc架构 xff0c HX软件包为依据 xff0c 详细内容可参考源码 CPU e300c3MPC8308 400MHz BOARD Freescale MPC8308ERDB
  • ubuntu下SD卡分区与挂载

    本来只是想借SD卡来做一个OK6410的升级 但笔记本上只装了ubuntu xff0c 一开始是可以识别sd卡的 xff0c 但按照网上的教程不小心将 dev sdb1删除了 导致ubuntu不能识别sd卡了 记录一下解决过程 1 sd的设
  • 信号量与互斥锁的一些理解

    一直对信号量和互斥锁只有一个模糊的认识 xff0c 今天特别学习了 xff0c 总结一下 一 从作用上来讲 互斥锁是用在多线程多任务互斥的 信号量用于线程的同步 二 从原理上讲 线程互斥锁 pthread mutex t 的实现原理 xff
  • springboot2.x +kafka使用和源码分析九(KafkaListenerEndpointRegistry暂停启动容器)

    我们在运行中如果需要暂停启动容器时可以通过此类KafkaListenerEndpointRegistry来处理 KafkaListenerEndpointRegistry源码 只解释了核心代码 public class KafkaListe