Spring Apache Kafka onFailure KafkaTemplate 的回调未因连接错误而触发

2023-12-21

我目前正在 Spring Boot 应用程序中对 Apache Kafka 进行大量实验。

我当前的目标是编写一个接收一些消息负载的 REST 端点,它将使用 KafkaTemplate 将数据发送到在端口 9092 上运行的本地 Kafka。

这是我的生产者配置:

@Bean
public Map<String,Object> producerConfig() {

    // config settings for creating producers
    Map<String,Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,5000);
    configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000);
    configProps.put(ProducerConfig.RETRIES_CONFIG,0);

    return configProps;

}

@Bean
public ProducerFactory<String,String> producerFactory() {
    // creates a kafka producer
    return new DefaultKafkaProducerFactory<>(producerConfig());
}

@Bean("kafkaTemplate")
public KafkaTemplate<String,String> kafkaTemplate(){
    // template which abstracts sending data to kafka
    return new KafkaTemplate<>(producerFactory());
}

我的休息端点转发到服务,该服务如下所示:

  @Service
    public class KafkaSenderService {

        @Qualifier("kafkaTemplate")
        private final KafkaTemplate<String,String> kafkaTemplate;

        @Autowired
        public KafkaSenderService(KafkaTemplate<String,String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }

        public void sendMessageWithCallback(String message, String topicName) {

            // possibility to add callbacks to define what shall happen in success/ error case
            ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topicName, message);

            future.addCallback(new KafkaSendCallback<String, String>() {

                @Override
                public void onFailure(KafkaProducerException ex) {
                    logger.warn("Message could not be delivered. " + ex.getMessage());
                }

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset());
                }
            });

        }
}

现在的情况是:我期望当消息无法发送时调用“onFailure()”方法。但这似乎行不通。当我将生产者配置中的 bootstrapServers 变量更改为 localhost:9091 (这是错误的端口,因此应该无法连接)时,生产者会尝试连接到代理。它将进行多次连接尝试,5 秒后,将发生 TimeOutException。但是“onFailure()”方法不会被调用。有没有办法实现“onFailure()”方法在无法建立连接时可以被调用事件?

顺便说一下,我将重试计数设置为零,但生产者在第一次连接尝试后仍然会进行第二次连接尝试。这是日志输出:

编辑:当代理不可用时,Kafka 生产者/ KafkaTemplate 似乎会进入无限循环。这真的是预期的行为吗?


讨论中回答的问题https://github.com/spring-projects/spring-kafka/discussions/2250# https://github.com/spring-projects/spring-kafka/discussions/2250#对于任何其他偶然发现此线程的人。简而言之,kafkaTemplate.getProducerFactory().reset();就可以了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Apache Kafka onFailure KafkaTemplate 的回调未因连接错误而触发 的相关文章

随机推荐