Kafka Connect 接收器任务忽略容差限制

2023-11-24

我尝试忽略接收器连接器中的错误消息errors.tolerance: all选项。完整的连接器配置:

{
    "name": "crm_data-sink_pandora",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 6,
        "topics": "crm_account_detail,crm_account_on_competitors,crm_event,crm_event_participation",
        "connection.url": "jdbc:postgresql://dburl/service?prepareThreshold=0",
        "connection.user": "pandora.app",
        "connection.password": "*******",
        "dialect.name": "PostgreSqlDatabaseDialect",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "guid",
        "table.name.format": "pandora.${topic}",
        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,
     "errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
        "errors.deadletterqueue.context.headers.enable":true
    }
}

目标表DDL:

create table crm_event_participation
(
  guid              char(36) not null
    constraint crm_event_participation_pkey
      primary key,
  created_on        timestamp,
  created_by_guid   char(36),
  modified_on       timestamp,
  modified_by_guid  char(36),
  process_listeners integer,
  event_guid        char(36),
  event_response    varchar(250),
  note              varchar(500),
  is_from_group     boolean,
  contact_guid      char(36),
  target_item       integer,
  account_guid      char(36),
  employer_id       integer
);

连接器成功启动,但如果发生错误(例如缺少字段),则会失败。

curl -X GET http://kafka-connect:9092/connectors/crm_data-sink_pandora/status:

{
    "name": "crm_data-sink_pandora",
    "connector": {
        "state": "RUNNING",
        "worker_id": "192.168.2.254:10900"
    },
    "tasks": [
        {
            "state": "FAILED",
            "trace": 
              "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
                 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
                 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                 at java.lang.Thread.run(Thread.java:748)
              Caused by: org.apache.kafka.connect.errors.ConnectException: Table \"pandora\".\"crm_event_participation\" is missing fields ([SinkRecordField{schema=Schema{STRING}, name='event_id', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='event_response_guid', isPrimaryKey=false}]) and auto-evolution is disabled
                 at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:140)
                 at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
                 at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
                 at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
                 at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
                 at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
                 ... 10 more",
            "id": 0,
            "worker_id": "192.168.2.254:10900"
        }
        ...
    ]
}

记录异常:

[2019-03-29 16:59:30,924] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='process_listners', isPrimaryKey=false}] among column names [employer_id, modified_on, modified_by_guid, contact_guid, target_item, guid, created_on, process_listeners, event_guid, created_by_guid, is_from_group, account_guid, event_response, note] (io.confluent.connect.jdbc.sink.DbStructure)
[2019-03-29 16:59:30,924] ERROR WorkerSinkTask{id=crm_data-sink_pandora-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Table "pandora"."crm_event_participation" is missing fields ([SinkRecordField{schema=Schema{INT32}, name='process_listners', isPrimaryKey=false}]) and auto-evolution is disabled at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:140)
  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

请解释一下连接器配置可能有什么问题?我使用 Kafka 2.0.0 和 JdbcSinkConnector 5.1.0。


在你的卡夫卡消息中你有一个字段process_listners。您的表中不存在具有该名称的列。

我认为你有错字。在表中你有列process_listeners, not process_listners.

errors.tolerance属性仅适用于转换消息期间的错误。 更多关于errors.tolerance你可以阅读:kafka连接-jdbc接收sql异常

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Connect 接收器任务忽略容差限制 的相关文章

随机推荐

  • HTML5 本地存储与会话存储

    除了非持久性和仅限于当前窗口之外 会话存储相对于本地存储还有什么好处 性能 数据访问等 吗 本地存储 and 会话存储两者都延伸Storage 除了预期的 非持久性 之外 它们之间没有任何区别sessionStorage 也就是说 数据存储
  • 接收远程服务器返回错误:(403) 禁止消息

    我在下面的代码块上收到 远程服务器返回错误 403 Forbidden 错误消息 具体来说 这一行失败了 var 响应 HttpWebResponse request GetResponse 该代码在我的开发机器上完美运行 但在生产中却无法
  • 将 DBRef 解析为 Json

    我在 MongoDB 的规范化数据模型结构中收到以下错误 org bson codecs configuration CodecConfigurationException Can t find a codec for class com
  • 从任何函数中提取函数参数和默认值

    有没有办法从任何给定函数中提取参数及其各自的默认值outside功能 例如 给定 myfunc lt function a b 1 print c a b 我正在寻找一些会返回的函数 list a NULL b 1 或其一些变体 您正在寻找
  • 指定分页页面 - Laravel 4

    我试图 记住 用户浏览记录时所在的页面 以便当他返回列表时 他会返回到他离开的页面 如何更改分页器的 当前页面 值 我尝试过 Input set page x 但没有这样的功能 GET 页面 x 也不起作用 这是代码 list Public
  • 将独立图例合并到 ggpairs 中(采取 2)

    tl dr无法获得独立的图例 描述整个绘图中的常见颜色 ggpairs令我满意 抱歉长度 我正在尝试使用绘制 下三角 对图GGally ggpairs 用于绘制各种绘图矩阵的扩展包ggplot2 这本质上是同一个问题如何向 ggpairs
  • Django Rest Framework:使用令牌身份验证时重定向到 Amazon S3 失败

    我在 DRF 中使用令牌身份验证 并且对于某个 API 调用 想要重定向到 S3 使用类似的 URLhttps my bucket s3 amazonaws com my file path my file jpg Signature MY
  • AttributeError:“池”对象没有属性“__exit__”

    我正在使用一些多处理Python脚本multiprocessing Pool 这些脚本如下所示 from multiprocessing import Pool def f x return x x if name main with Po
  • 我什么时候应该在菜单项中使用省略号

    我什么时候应该把 放在菜单项的末尾 我似乎记得读过一些规则 但我一辈子都找不到它们 对于上下文 我正在向右键单击菜单添加属性选项 并且想知道添加它们是否合适 据我了解 这表明该选项在实际执行任何操作之前会询问您其他问题 这 3 个点实际上称
  • Sql Server 2005 - 如果不存在则插入

    互联网上有很多关于这个常见 问题 的信息 解决方案如 IF NOT EXISTS BEGIN INSERT INTO END 在我看来 它们不是线程安全的 您可能会同意 但是你能确认将exists放入单选的where子句中就可以解决sql引
  • 如何从 App Store 上提供的 ipa 文件获取 dSYM 文件

    有没有办法从 App Store 中提供的 ipa 文件获取 dSYM 文件 我丢失了特别包含 dSYM 的档案 我可以这样做吗 我需要将 dSYM 上传到 Crittercism 提前致谢 假设您仍然可以访问 iTunes Connect
  • C# SIP 堆栈/库 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 目前不接受答案 我正在寻找一个好的 SIP 库 要么用 C 编写 要么提供 C 包装器 不一定需要免费 有人用过什么好东西吗 为了澄清起见 我说的是 VoIP 协议
  • 创建具有动态属性名称的对象[重复]

    这个问题在这里已经有答案了 我正在尝试这样做 var KEYS KEYS PHONE TYPE phone type KEYS AGENT TYPE agent type var myAppConfig iconMap KEYS PHONE
  • SQLAlchemy with_for_update 行锁定不起作用?

    有一个学生 他的type属性为 4 最小值为type属性可以是1 在 postgres 中 在会话 1 中 我独占锁定并更新学生表中的一行 BEGIN LOCK TABLE students IN ROW EXCLUSIVE MODE SE
  • 直接在SQLite中计算两点之间的距离

    在我的 web MySQL 应用程序中 我有类似的方法来获取两点之间的距离 6371 acos cos radians 19 83996 cos radians lat cos radians 43 94910 radians lng si
  • 如何从 preg_split 结果中删除空数组? [复制]

    这个问题在这里已经有答案了 举个例子 我有大量的正则表达式 就像我写的一样简单 php gt var dump preg split reg s reg a zA Z array 3 0 gt string 0 1 gt string 13
  • 裤子包括 OS X 特定的 Python 轮子

    TLDR Pants 获取 OS X 特定的轮子 因为我正在 Mac 上开发 我怎样才能避免这种情况 或者指定我将部署到 Ubuntu 完整故事 尝试用 Pants 打包 Python 应用程序 到目前为止进展顺利 但遇到了一个困扰我一段时
  • 以编程方式设置或查看“高级 Wifi”设置

    我需要一种以编程方式打开 高级 wifi 设置的方法 以让用户更改某些设置 或者最好以编程方式更改这些高级无线设置 到目前为止 我只能通过 startActivity new Intent Settings ACTION WIFI SETT
  • 什么样的类型定义在本地环境中是合法的?

    在伊莎贝尔的NEWS文件 我发现 命令 typedef 现在可以在本地理论上下文中工作 无需 引入对参数或假设的依赖 这不是 可以在 Isabelle Pure HOL 中实现 请注意 逻辑环境可能 包含本地 typedef 的多种解释 具
  • Kafka Connect 接收器任务忽略容差限制

    我尝试忽略接收器连接器中的错误消息errors tolerance all选项 完整的连接器配置 name crm data sink pandora config connector class io confluent connect