我有一个让 kafkalistener 从头开始读取消息的工作代码(offset=0)
一个主题(始终运行)。
对于我的用例(消息传递),我需要两件事:
始终捕获特定主题/分区的新消息(该消费者始终在运行)并发送到前端 websocket+stomp。 (这部分我已经有了)
仅当前端发出信号时,启动新的消费者以获取从特定主题/分区的开始到当前的消息,然后停止,以便前端 websocket+stomp 可以获取这些数据(为后期用户或稍后加载先前的消息)(在会议开始时)
如果我可以动态地(从前端获取信号后)添加/删除带参数的 kafkaListener(来自 post 方法的数据),它将同时服务
实际上,我该如何实现呢?我是否应该考虑使用 post 方法来通知后端我需要立即加载此主题/分区的先前消息并将其发送到此“..”url?但那么我如何动态启动和关闭该消费者(kafkaListener)而不一直运行并在那里传递参数呢?
这是一个快速的 Spring Boot 应用程序,展示了如何动态创建容器。
@SpringBootApplication
public class So61950229Application {
public static void main(String[] args) {
SpringApplication.run(So61950229Application.class, args);
}
@Bean
public ApplicationRunner runner(DynamicListener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("so61950229", "foo" + i));
System.out.println("Hit enter to start a listener");
System.in.read();
listener.newContainer("so61950229", 0);
System.out.println("Hit enter to start another listener");
System.in.read();
listener.newContainer("so61950229", 0);
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61950229").partitions(1).replicas(1).build();
}
}
@Component
class DynamicListener {
private static final Logger LOG = LoggerFactory.getLogger(DynamicListener.class);
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final ConcurrentMap<String, AbstractMessageListenerContainer<String, String>> containers
= new ConcurrentHashMap<>();
DynamicListener(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.factory = factory;
}
void newContainer(String topic, int partition) {
ConcurrentMessageListenerContainer<String, String> container =
this.factory.createContainer(new TopicPartitionOffset(topic, partition));
String groupId = UUID.randomUUID().toString();
container.getContainerProperties().setGroupId(groupId);
container.setupMessageListener((MessageListener) record -> {
System.out.println(record);
});
this.containers.put(groupId, container);
container.start();
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
AbstractMessageListenerContainer<String, String> container = this.containers.remove(
event.getContainer(ConcurrentMessageListenerContainer.class).getContainerProperties().getGroupId());
if (container != null) {
LOG.info("Stopping idle container");
container.stop(() -> LOG.info("Stopped"));
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5000
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)