有时,可以在反序列化之前根据标头值过滤掉消息。使用 spring kafka 是否有针对此场景的任何现有模式。我正在考虑实现类似于 ErrorHandlingDeserializer 除了委托之外还将过滤谓词作为属性。有什么建议么?谢谢。
是的,您可以使用与ErrorHandlingDeserializer
返回一个“标记”对象而不是进行反序列化,然后添加一个RecordFilterStrategy
,过滤带有此类对象的记录,发送给侦听器(使用时的容器工厂)@KafkaListener
或使用过滤适配器作为显式侦听器)。
EDIT
Spring Boot 并添加过滤器...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)