Hy,
我正在使用 spring-kafka 1.3.0.RELEASE 创建事务生产者。
当引导服务器关闭时,DefaultKafkaProducerFactory 会无休止地等待,直到引导服务器启动。
我究竟做错了什么 ?
我可以设置超时和/或其他类似属性吗?
这是我的代码示例,用于重现该场景:
public static void main(String[] args) {
final DefaultKafkaProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix("transactionIdPrefix");
final Producer<Object, Object> producer = producerFactory.createProducer();
System.out.println("Created producer:" + producer);
}
private static Map<String, Object> producerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
return props;
}
这是由工厂调用引起的initTransactions()
创建生产者后,例如,如果没有足够的代理来支持事务日志复制因子。
我不知道为什么超时不适用于该操作。
我们也许可以改变工厂来推迟initTransactions()
直到第一个beginTransaction()
- 但这只会将问题推向下游。
我使用 kafka 1.0.0 客户端进行了测试(可以与 1.3.1 或更高版本一起使用 - 目前为 1.3.2),但仍然存在问题。我认为应该尊重TRANSACTION_TIMEOUT_CONFIG
但似乎并非如此。
我建议在 Kafka 上开一个问题JIRA https://issues.apache.org/jira/browse/KAFKA.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)