Apache Kafka JDBC 连接器 - SerializationException:未知的魔术字节

2023-12-11

我们尝试使用 Confluence JDBC Sink Connector 将主题中的值写回到 postgres 数据库。

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

我们可以使用以下命令读取控制台中的值:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name

架构存在并且值已正确反序列化kafka-avro-console-consumer因为它没有给出错误,但连接器给出了这些错误:

  {
  "name": "datawarehouse_sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "x.x.x.x:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "x.x.x.x:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
    }
  ],
  "type": "sink"
}

最终的错误是:

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

该模式在模式注册表中注册。

问题是否出在连接器的配置文件上?


错误org.apache.kafka.common.errors.SerializationException: Unknown magic byte!表示有关该主题的消息无效 Avro,无法反序列化。这可能有几个原因:

  1. 有些消息是 Avro,但其他消息不是。如果是这种情况,您可以使用 Kafka Connect 中的错误处理功能来忽略无效消息,配置如下:

    "errors.tolerance": "all",
    "errors.log.enable":true,
    "errors.log.include.messages":true
    
  2. The value是 Avro 但key不是。如果是这种情况,请使用适当的key.converter.

更多阅读:https://www.confluence.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Kafka JDBC 连接器 - SerializationException:未知的魔术字节 的相关文章

随机推荐

  • JavaScript for 循环索引奇怪[重复]

    这个问题在这里已经有答案了 我对 JS 比较陌生 所以这可能是一个常见问题 但我在处理 for 循环和 onclick 函数时注意到一些奇怪的事情 我能够用这段代码复制这个问题
  • 如何比较数组?还有改变属性?

    我是新的 ios 开发人员 我想比较和更改属性 数组1 对象1 对象2 对象3 对象4 数组2 对象2 对象4 对象5 对象8 比较数组 1 和数组 2 如果 Array2 中存在相同的对象 请更改对象中的属性 在上面的例子中 Object
  • 如何以 root 权限启动 QProcess?

    我需要启动gphoto2来自 Qt 程序 我这样做 QString gphotoProgram usr bin gphoto2 QStringList gphotoArguments gphotoArguments lt lt captur
  • 如何修复 Linux 中的“usr/bin/google-chrome 不再运行,因此 ChromeDriver 假设 Chrome 已崩溃”错误? [复制]

    这个问题在这里已经有答案了 我正在尝试设置一个 jenkins 服务器来在 Amazon Linux 上使用 Selenium webdriver 3 142 0 和 ruby 2 3 7 托管我的自动化框架 如果我尝试使用我的脚本调用 c
  • 从一堆对象中提取一个对象并检测边缘

    在我的大学项目中 我需要通过检测叶子的边缘来根据植物叶子的形状来识别植物的种类 我使用OpenCV 2 4 9和C 但源图像是在植物的真实环境中拍摄的 并且有不止一片叶子 请参阅下面的示例图片 所以这里我需要提取一片叶子的边缘图案来进一步处
  • 在 Visual Studio 环境中将第一个自定义对话框添加到 WIX

    我正在使用 Visual Studio 构建我的 wix 文件 到目前为止 我有一个文件 Product wxs 它可以进行简单的安装 现在我想添加一些自定义对话框 我认为从下面的两篇文章中 我了解了如何做到这一点 在我设置了环境之后 ht
  • 基底存储中私有变量的可能性

    是否可以将私有变量存储在底层存储中 特别是以以下形式存储并在私有函数中访问它们 derive Encode Decode Default Clone PartialEq Debug pub struct MyStruct id Hash t
  • python:ext4 文件系统中 os.path.exists 的复杂性?

    有谁知道 os path exists 函数在带有 ext4 文件系统的 python 中的复杂性是多少 使用的底层目录结构Ext4 and Ext3 与中完全相同Ext2 Ext3添加日记 Ext4改善日记 日记与你的问题无关 最初 Ex
  • 列出物理驱动器空间

    我有大约 200 台服务器 我需要获取磁盘空间和逻辑驱动器空间详细信息 可用空间 已用空间和总空间 这是我的 PowerShell 查询 infoObjects New Object PSObject foreach machine in
  • 在 Firefox 中使用 css 转换时边框渲染不正确

    我有一个像这样的简单 CSS 箭头 arrow brown height 18px width 18px border top 6px solid 39170b border right 6px solid 39170b moz trans
  • UITextView 或 UILabel Swift 上图像的自定义项目符号

    我正在创建一个应用程序 它基本上是一个大型网站的移动版本 该网站有一个特定区域列出了产品功能 每个功能都标有独特的自定义图像 有没有办法快速做到这一点 本质上 创建一个字符串项目符号列表 但使用小图像作为项目符号点 将 UITextView
  • 如何插入重新启动游戏选项?

    我希望在骰子游戏结束时有一个选项 上面写着 您想重新启动吗 是还是不是 如果用户输入 是 游戏就会重新启动 并且会无限次 直到用户玩够游戏并退出 我知道你可以用循环来做到这一点 但是怎么做呢 import random print Dice
  • Java程序如何获得自己的进程ID?

    如何获取我的 Java 进程的 ID 我知道有几种依赖于平台的技巧 但我更喜欢更通用的解决方案 不存在可以保证在所有 jvm 实现中工作的独立于平台的方法 ManagementFactory getRuntimeMXBean getName
  • sed 就地编辑文件

    如何编辑文件single sed命令 目前 我必须手动将编辑的内容流式传输到新文件中 然后将新文件重命名为原始文件名 I tried sed i 但我的 Solaris 系统说 i是一个非法的选择 有不同的方法吗 The i option无
  • 推送不会修改作为函数参数的列表[重复]

    这个问题在这里已经有答案了 我是 common lisp 的新手 所以希望有人能向我澄清这一点 假设我们有一个列表并想要添加一个项目push修改它 CL USER gt defparameter xx 1 2 3 XX CL USER gt
  • PHP odbc_fetch_array 字符串限制

    我有以下sql SELECT bw imp step as imp action FROM cm3rm1 m1 INNER JOIN cm3rm2 m2 ON m1 number m2 number WHERE m1 number id 当
  • 如何使用 MLlib 在 Spark 上生成(原始标签、预测标签)的元组?

    我正在尝试使用从 Spark 上的 MLlib 返回的模型进行预测 目标是生成 orinalLabelInData predictedLabel 的元组 然后这些元组可以用于模型评估目的 实现这一目标的最佳方法是什么 谢谢 假设 parse
  • 如何获取用户位置?

    我试图使用以下代码获取用户的当前位置 但它不起作用 我已经添加了两个NSLocationWhenInUseUsageDescription钥匙和NSLocationAlwaysUsageDescription我的钥匙Info plist f
  • 如何在Windows窗体应用程序中实现键盘按键

    我想在 C Windows 窗体应用程序中实现键盘按钮按下命令 假设如果达到某个值 我想使用 Windows 窗体应用程序按下 L 键 这可能吗 怎么做 这可能对你有用 但是 更好的方法可能是设置表单的KeyPreview财产给true 然
  • Apache Kafka JDBC 连接器 - SerializationException:未知的魔术字节

    我们尝试使用 Confluence JDBC Sink Connector 将主题中的值写回到 postgres 数据库 connector class io confluent connect jdbc JdbcSinkConnector