如果我使用 kafka-connect 来消费消息并存储到 s3(使用 kafka-connect s3 连接器),我是否可以将消息偏移量与事件负载一起存储?我希望使用这些数据对消息进行一些排序,并检查是否存在任何间隙或检查我收到的消息中是否有重复项。 (例如,如果我的消费者偏移量被意外破坏并且我重新启动了 kafka-connect)。这是可能的还是我应该为这种类型的功能编写一个自定义订阅者?
根据有关的文档插入字段 https://docs.confluent.io/current/connect/transforms/insertfield.html#id1转换,你可以使用offset.field
:
Name Description
offset.field Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).
总体而言,您的单消息转换 (SMT) 配置如下所示:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"
如果这不是您想要的,那么总是可以选择创建您的定制 https://docs.confluent.io/current/connect/transforms/custom.html#custom-transform转变
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)