我一直在尝试将 Confluence 中的 kafka-avro-console-consumer 连接到我们的旧版 Kafka 集群,该集群是在没有 Confluence Schema Registry 的情况下部署的。
我使用以下属性显式提供了架构:
kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
--topic test \
--from-beginning \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"long"}'
但我收到“未知的魔法字节!”错误与org.apache.kafka.common.errors.SerializationException
是否可以使用 Confluence kafka-avro-console-consumer 使用来自 Kafka 的 Avro 消息,这些消息未使用 Confluence 中的 AvroSerializer 和 Schema 注册表进行序列化?
Confluence Schema Registry 序列化器/反序列化器使用有线格式 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format其中在消息的初始字节中包含有关架构 ID 等的信息。
如果您的消息尚未使用架构注册表序列化程序进行序列化,那么您将无法使用它反序列化它,并且将得到Unknown magic byte!
error.
因此,您需要编写一个消费者来提取消息,使用 Avro avsc 架构进行反序列化,然后假设您想要保留数据,使用架构注册表序列化器 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#sr-serializer-and-formatter
Edit:我最近写了一篇文章,更深入地解释了整个事情:https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)