我正在尝试找到一种方法来使用新的 DefaultErrorHandler 而不是 spring-kafka 2.8.1 中已弃用的 SeekToCurrentErrorHandler ,以便在出现错误时覆盖重试默认行为。我想“停止”重试过程,因此如果发生错误,则不应重试。
现在,我在配置类中拥有以下可按预期工作的 bean:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
由于在这个 spring kafka 版本中,STCEH 已被弃用,我尝试在同一个配置类中执行以下操作:
@Bean
public DefaultErrorHandler eh() {
return new DefaultErrorHandler(new FixedBackOff(0, 1));
}
但似乎不起作用。如果出现错误,重试次数是默认的,正如我在日志中看到的:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 错误 DefaultErrorHandler - 退避 FixBackOff{interval=0, currentAttempts=10, maxAttempts=9} 已耗尽 topicX
应该如何使用这个 DefaultErrorHandler 才能实现所需的行为?或者我应该使用其他东西?
提前谢谢!