我目前正在 Spring Boot 应用程序中对 Apache Kafka 进行大量实验。
我当前的目标是编写一个接收一些消息负载的 REST 端点,它将使用 KafkaTemplate 将数据发送到在端口 9092 上运行的本地 Kafka。
这是我的生产者配置:
@Bean
public Map<String,Object> producerConfig() {
// config settings for creating producers
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,5000);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000);
configProps.put(ProducerConfig.RETRIES_CONFIG,0);
return configProps;
}
@Bean
public ProducerFactory<String,String> producerFactory() {
// creates a kafka producer
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean("kafkaTemplate")
public KafkaTemplate<String,String> kafkaTemplate(){
// template which abstracts sending data to kafka
return new KafkaTemplate<>(producerFactory());
}
我的休息端点转发到服务,该服务如下所示:
@Service
public class KafkaSenderService {
@Qualifier("kafkaTemplate")
private final KafkaTemplate<String,String> kafkaTemplate;
@Autowired
public KafkaSenderService(KafkaTemplate<String,String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessageWithCallback(String message, String topicName) {
// possibility to add callbacks to define what shall happen in success/ error case
ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new KafkaSendCallback<String, String>() {
@Override
public void onFailure(KafkaProducerException ex) {
logger.warn("Message could not be delivered. " + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset());
}
});
}
}
现在的情况是:我期望当消息无法发送时调用“onFailure()”方法。但这似乎行不通。当我将生产者配置中的 bootstrapServers 变量更改为 localhost:9091 (这是错误的端口,因此应该无法连接)时,生产者会尝试连接到代理。它将进行多次连接尝试,5 秒后,将发生 TimeOutException。但是“onFailure()”方法不会被调用。有没有办法实现“onFailure()”方法在无法建立连接时可以被调用事件?
顺便说一下,我将重试计数设置为零,但生产者在第一次连接尝试后仍然会进行第二次连接尝试。这是日志输出:
编辑:当代理不可用时,Kafka 生产者/ KafkaTemplate 似乎会进入无限循环。这真的是预期的行为吗?