I use 春季卡夫卡 API通过手动偏移量管理来实现 Kafka 消费者:
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
在这里,我希望消费者仅在以下情况下提交偏移量:someCondition
成立。否则消费者应该睡一会儿并阅读同一条消息 again.
卡夫卡配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
在当前配置下,如果someCondition == false
,消费者不提交偏移量,但仍然读取下一条消息。如果卡夫卡有没有办法让消费者重读消息acknowledgement
没有执行?
正如@Gary 已经指出的,你的方向是正确的,seek()
就是这样做的方法。今天,当我遇到这个问题时,我找不到它的代码示例。这是任何想要解决问题的人的代码。
public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {
private ConsumerSeekCallback consumerSeekCallback;
@Override
public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
if (/*some condition*/) {
//process
acknowledgment.acknowledge(); //send ack
} else {
consumerSeekCallback.seek("your.topic", record.partition(), record.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.consumerSeekCallback = consumerSeekCallback;
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
// nothing is needed here for this program
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)