我们在运行中如果需要暂停启动容器时可以通过此类KafkaListenerEndpointRegistry来处理。
KafkaListenerEndpointRegistry源码(只解释了核心代码):
public class KafkaListenerEndpointRegistry implements
//当spring销毁bean时执行操作
DisposableBean,
//在Spring加载和初始化所有bean后开启异步线程来管理容器
SmartLifecycle,
//注入ApplicationContext对象
ApplicationContextAware,
//(spring观察者模式体现)
//监听ContextRefreshedEvent事件
ApplicationListener<ContextRefreshedEvent> {
//存放所有listenerContainers线程安全容器
private final Map<String, MessageListenerContainer> listenerContainers =
new ConcurrentHashMap<String, MessageListenerContainer>();
private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
//applicationContext 对象
private ConfigurableApplicationContext applicationContext;
//applicationContext 刷新标志位
private boolean contextRefreshed;
//判断该线程是否运行
private volatile boolean running;
//获取spring容器中bean id的ListenerContainer
public MessageListenerContainer getListenerContainer(String id) {
.....
}
//获取所有容器ids
public Set<String> getListenerContainerIds() {
......
}
//获取托管容器的集合 但不包含被声明为Bean的容器
public Collection<MessageListenerContainer> getListenerContainers() {
.....
}
//获取所有容器的集合 包括由注册表管理的容器和声明为bean的容器
public Collection<MessageListenerContainer> getAllListenerContainers() {
.....
}
//注册新的监听容器
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
}
//DisposableBean 接口中实现方法 当KafkaListenerEndpointRegistry被销毁时 销毁所有容器
public void destroy() {
.....
}
//当spring中有ContextRefreshedEvent事件产生 执行
public void onApplicationEvent(ContextRefreshedEvent event) {
.....
}
//启动所有MessageListenerContainer容器
public void start() {
.....
}
//停止所有MessageListenerContainer容器
public void stop() {
.....
}
}
测试代码 :
@SpringBootApplication
//ApplicationListener 监听KafkaEvent事件
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
//接收KafkaEvent事件
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
//当容器.pause()是会触发ConsumerPausedEvent事件
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
//当当容器.pause()是会触发ConsumerResumedEvent事件
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(3)
.replicas(1)
.build();
}
}
备注:对上述描述中 ApplicationListener 可参考后续发布的spring提供的事件机制
测试结果:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)