我是 kafka 新手,我使用 Kafka Producer Java api。
面对Kafka的这个问题,Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION
.
人们已经写过producer.abortTransaction()
仅当没有正在进行的交易时才应调用......
知道如何检查飞行中是否有交易吗?以及如何清除/停止它们?
这是我的代码:
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e != null){
logger.info("Record was not sent due to kafka issue");
throw new KafkaException("Record was not sent due to kafka issue");
}
}
});
} catch (KafkaException e){
producer.abortTransaction();
}
我需要实现的是检测kafka何时停止,在这种情况下清除所有缓冲区,以便当kafka再次启动时这些缓冲区中的记录不会出现在消费者端。
在这种情况下,您通常要做的是应用 Java 文档中描述的事务。Kafka生产者 https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
这样,如果满足以下条件,则 100 条记录要么全部对消费者可见,要么全部不可见:isolation.level
被设定为read_committed
.
你正在关闭producer.close()
对于不可恢复的异常,例如
-
生产者防护异常:这个致命异常表明另一个生产者具有相同的transactional.id
已经开始了。只能有一个生产者实例transactional.id
在任何给定时间,最新启动的实例都会“隔离”先前的实例,以便它们无法再发出事务请求。当遇到此异常时,必须关闭生产者实例。
-
乱序序列异常:此异常表明代理从生产者处收到了意外的序列号,这意味着数据可能已丢失。如果生产者仅配置为幂等性(即如果enable.idempotence
已设置并且没有transactional.id
已配置),可以继续使用相同的生产者实例发送,但这样做有重新排序发送记录的风险。对于事务性生产者来说,这是一个致命错误,您应该关闭生产者。
-
授权异常:[不言自明]
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)