我想将 Spring Kafka 与事务一起使用,但我不太明白它应该如何配置以及它如何工作。
这是我的配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ACKS_CONFIG, "all");
此配置用于带有事务 id 前缀的 DefaultKafkaProducerFactory 中:
defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");
问题1:
我应该如何选择这个交易ID前缀?
如果我理解正确的话,spring 使用这个前缀为创建的每个生产者生成一个事务 id。
为什么我们不能只使用“UUID.randomUUID()”?
问题2:
如果生产者被销毁,它将生成一个新的事务id。
因此,如果应用程序崩溃,重新启动时它将重用旧的事务 ID。
这正常吗???
问题3:
我正在使用部署在云上的应用程序,该应用程序可以自动放大/缩小。
这意味着我的前缀无法修复,因为每个实例上的所有生产者都将具有冲突的事务 ID。
我应该在其中添加随机部分吗?
当实例缩小/放大或崩溃并重新启动时,我是否需要恢复相同的前缀?
问题4:
最后但并非最不重要的一点是,我们正在使用 Kafka 的凭据。
这似乎不起作用:
Current ACLs for resource `TransactionalId:my_app*`:
User:CN... has Allow permission for operations: All from hosts: *
知道我的事务 ID 已生成,我应该如何设置 ACL?
Edit 1
进一步阅读后,如果我理解正确的话。
如果您有从 P0(分区)读取 C0(消费者)。如果代理启动消费者重新平衡。
P0 可以分配给另一个消费者 C1。
这个消费者 C1 应该使用与之前的 C0 相同的事务 ID 以防止重复(僵尸围栏)?
你如何在 spring-kafka 中实现这一点?事务 ID 似乎与消费者无关,因此与分区读取无关。
Thanks