我想要实现的场景是使用来自 Kafka 的消息,处理它,如果某些条件失败我不希望确认该消息。为此,我在 Spring Cloud Stream 参考文档中找到了,
自动提交偏移量
处理消息后是否自动提交偏移量。如果设置为 false,则消息标头中将提供确认标头以供后期确认。
默认值:true。
我的问题是,将 autoCommitOffset 设置为 false 后,我如何确认消息?代码示例将不胜感激。
我已经在这里提供了问题的答案https://github.com/spring-cloud/spring-cloud-stream/issues/575 https://github.com/spring-cloud/spring-cloud-stream/issues/575
本质上,这取决于设置
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
然后处理确认标头:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)