1.业务需求
在springboot项目中,使用spring-kafka消费kafka数据。希望能够控制消费者(KafkaConsumer)启动或停止消费,并且在启动消费时只消费当前时刻以后生产的数据(最新生产的数据),也就是说,启动消费之前未消费的数据不再消费。
2.实现
2.1.创建消费监听
按照官方文档创建一个监听。
官方文档地址
KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
2.2.控制启动/停止消费
通过KafkaListenerEndpointRegistry拿到listenerContainer,操作它即可达到控制目的。
创建一个Kafak控制类,实现控制代码。
KafkaCtrlHandler.java
@Slf4j
@Component
public class KafkaCtrlHandler {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void start() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
listenerContainer.resume();
log.info("kafka consumer开始消费");
}
public void stop() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
listenerContainer.pause();
log.info("kafka consumer停止消费");
}
}
这样即可通过KafkaCtrlHandler 实例来控制消费者开始或者暂停监听。
2.3.控制启动消费时只消费最新数据
让KafkaConsumer类实现org.springframework.kafka.listener包下的ConsumerSeekAware接口,并实现onPartitionsAssigned方法。
监听创建时,设置各个分区的偏移量。
具体原理待研究,有懂的大佬请留言指教。
新的KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
@NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));
}
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
注意,修改上面的kafka控制类KafkaCtrlHandler.java,停止消费时让监听器停止(stop)而非暂停(pause)。这样监听才会重新创建并设置各分区的偏移量。
新KafkaCtrlHandler.java
@Slf4j
@Component
public class KafkaCtrlHandler {
@Autowired
private KafkaListenerEndpointRegistry registry;
public void start() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
if (!listenerContainer.isRunning()) {
listenerContainer.start();
}
listenerContainer.resume();
log.info("kafka consumer开始消费");
}
public void stop() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("consumer-id");
assert listenerContainer != null;
listenerContainer.stop();
log.info("kafka consumer停止消费");
}
}
2.4.设置springboot 启动时消费者监听不自动启动
创建配置类
KafkaInitialConfiguration.java
@Slf4j
@Configuration
public class KafkaInitialConfiguration {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> customContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setAutoStartup(false);
return factory;
}
}
配置监听工厂
新的KafkaConsumer.java
@Slf4j
@Component
public class KafkaConsumer implements ConsumerSeekAware{
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
@NonNull ConsumerSeekAware.ConsumerSeekCallback callback) {
assignments.keySet().forEach(topicPartition-> callback.seekToEnd(topicPartition.topic(),
topicPartition.partition()));
}
@KafkaListener(
id = "consumer-id",
topics = {"topic1", "topic1", "topic3"},
groupId = "group-id",
containerFactory = "customContainerFactory"
)
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String topic = record.topic();
log.info(">>>kafka>>> topic: {}, msg: {}", topic, message);
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)