带有 kafka-avro-console-consumer 的未知魔法字节

2024-04-26

我一直在尝试将 Confluence 中的 kafka-avro-console-consumer 连接到我们的旧版 Kafka 集群,该集群是在没有 Confluence Schema Registry 的情况下部署的。 我使用以下属性显式提供了架构:

kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
    --topic test \
    --from-beginning \
    --property key.schema='{"type":"long"}' \
    --property value.schema='{"type":"long"}'

但我收到“未知的魔法字节!”错误与org.apache.kafka.common.errors.SerializationException

是否可以使用 Confluence kafka-avro-console-consumer 使用来自 Kafka 的 Avro 消息,这些消息未使用 Confluence 中的 AvroSerializer 和 Schema 注册表进行序列化?


Confluence Schema Registry 序列化器/反序列化器使用有线格式 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format其中在消息的初始字节中包含有关架构 ID 等的信息。

如果您的消息尚未使用架构注册表序列化程序进行序列化,那么您将无法使用它反序列化它,并且将得到Unknown magic byte! error.

因此,您需要编写一个消费者来提取消息,使用 Avro avsc 架构进行反序列化,然后假设您想要保留数据,使用架构注册表序列化器 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#sr-serializer-and-formatter

Edit:我最近写了一篇文章,更深入地解释了整个事情:https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

带有 kafka-avro-console-consumer 的未知魔法字节 的相关文章

  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 卡夫卡高级消费者 error_code=15

    当尝试使用高级消费者 使用全新的消费者组 从 Kafka 进行消费时 消费者永远不会开始运行 当我将日志记录级别切换为调试时 我可以看到以下两行一遍又一遍地重复 DEBUG AbstractCoordinator 09 43 51 192
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 带有 kafka-avro-console-consumer 的未知魔法字节

    我一直在尝试将 Confluence 中的 kafka avro console consumer 连接到我们的旧版 Kafka 集群 该集群是在没有 Confluence Schema Registry 的情况下部署的 我使用以下属性显式
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入

随机推荐