Kafka JDBC Sink 连接器:未分配任务

2023-11-25

我尝试使用以下配置启动 JDBC 接收器连接器:

{
    "name": "crm_data-sink_hh",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 6,
        "topics": "crm_account,crm_competitor,crm_event,crm_event_participation",

        "connection.url": "jdbc:postgresql://db_host/hh?prepareThreshold=0",
        "connection.user": "db_user",
        "connection.password": "${file:db_hh_kafka_connect_pass}",
        "dialect.name": "PostgreSqlDatabaseDialect",

        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "guid",

        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,

        "errors.deadletterqueue.topic.name":"crm_data_deadletterqueue",
        "errors.deadletterqueue.context.headers.enable":true
    }
}

但是当连接器处于运行状态时没有任务正在运行:

curl -X GET http://kafka-connect:10900/connectors/crm_data-sink_hh/status
{"name":"crm_data-sink_hh","connector":{"state":"RUNNING","worker_id":"172.16.24.14:10900"},"tasks":[],"type":"sink"}

我多次遇到这个问题,但我很困惑,因为它是随机发生的。我的问题与this问题。我将不胜感激任何帮助!


更新。 11/04/2019(不幸的是,现在我只有INFO级别的日志)

最后,经过几次尝试,我通过更新现有连接器的配置来启动连接器来运行任务crm_data-sink_db_hh:

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X PUT -d @new_config.json http://docker21:10900/connectors/crm_data-sink_db_hh/config -H 'Content-Type: application/json'

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"UNASSIGNED","worker_id":"192.168.1.198:10900"},"tasks":[],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"172.16.36.11:10900"},"tasks":[{"state":"UNASSIGNED","id":0,"worker_id":"172.16.32.11:10900"},{"state":"UNASSIGNED","id":1,"worker_id":"172.16.32.11:10900"},{"state":"RUNNING","id":2,"worker_id":"192.168.2.243:10900"},{"state":"UNASSIGNED","id":3,"worker_id":"172.16.32.11:10900"},{"state":"UNASSIGNED","id":4,"worker_id":"172.16.32.11:10900"}],"type":"sink"}

$ curl -X GET http://docker61:10900/connectors/crm_data-sink_db_hh/status
{"name":"crm_data-sink_db_hh","connector":{"state":"RUNNING","worker_id":"192.168.1.198:10900"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":1,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":2,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":3,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":4,"worker_id":"192.168.1.198:10900"},{"state":"RUNNING","id":5,"worker_id":"192.168.1.198:10900"}],"type":"sink"}

Log:

[2019-04-11 16:02:15,167] INFO Connector crm_data-sink_db_hh config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:15,668] INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:15,668] INFO Stopping connector crm_data-source (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:15,668] INFO Stopping task crm_data-source-0 (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:15,668] INFO Stopping connector crm_data-sink_pandora (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:15,668] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2019-04-11 16:02:15,668] INFO Stopping table monitoring thread (io.confluent.connect.jdbc.JdbcSourceConnector)
...
Stopping connectors and tasks 
...
[2019-04-11 16:02:17,373] INFO 192.168.1.91 - - [11/Apr/2019:13:02:14 +0000] "POST /connectors HTTP/1.1" 201 768  2468 (org.apache.kafka.connect.runtime.rest.RestServer)
[2019-04-11 16:02:20,668] ERROR Graceful stop of task crm_data-source-1 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] ERROR Graceful stop of task crm_data-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] ERROR Graceful stop of task crm_data-source-3 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] ERROR Graceful stop of task crm_data-source-2 failed. (org.apache.kafka.connect.runtime.Worker)
[2019-04-11 16:02:20,669] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,669] INFO [Worker clientId=connect-1, groupId=21] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2019-04-11 16:02:20,681] INFO Tasks [crm_data-sink_hhru-0, crm_data-sink_hhru-3, crm_data-sink_hhru-4, crm_data-sink_hhru-1, crm_data-sink_hhru-2, crm_data-sink_hhru-5, crm_data-pandora_sink-0, crm_data-pandora_sink-2, crm_data-pandora_sink-1, crm_data-pandora_sink-4, crm_data-pandora_sink-3, crm_data-pandora_sink-03-0, crm_data-pandora_sink-03-2, crm_data-pandora_sink-03-1, crm_data-pandora_sink-00-1, crm_data-pandora_sink-00-0, crm_data-pandora_sink-00-3, crm_data-pandora_sink-00-2, crcrm_data-pandora_sink-00-4, crm_data-sink_hh-00-0, crm_data-sink_hh-00-1, crm_data-sink_hh-00-2, crm_data-pandora_sink-test-3, crm_data-pandora_sink-test-2, crm_data-pandora_sink-test-4,crm_data-pandora_sink-01-2, crm_data-pandora_sink-01-1, crm_data-pandora_sink-01-0, crm_data-source-3, crm_data-source-2, crm_data-source-1, crm_data-source-0, crm_data-sink_db_hh-0, crm_data-sink_db_hh-1, crm_data-sink_db_hh-2, crm_data-sink_hh-01-0, crm_data-sink_hh-01-1, crm_data-sink_hh-01-2, crm_data-sink_hh-01-3, crm_data-sink_hh-00-3, crm_data-sink_hh-00-4, crm_data-sink_hh-00-5, crm_data-sink_hh-1, crm_data-sink_hh-0, crm_data-sink_hh-3, crm_data-sink_hh-2, crm_data-sink_hh-5, crm_data-sink_hh-4, crm_data-sink_pandora-5, crm_data-sink_pandora-0, crm_data-sink_pandora-1, crm_data-sink_pandora-2, crm_data-sink_pandora-3, crm_data_account_on_competitors-source-0, crm_data-sink_pandora-4] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,681] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,682] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,683] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,684] INFO Tasks [] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO [Worker clientId=connect-1, groupId=21] Successfully joined group with generation 2206465 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2019-04-11 16:02:20,685] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-57140c1d-3b19-4fc0-b4ca-e6ce272e1924', leaderUrl='http://192.168.1.198:10900/', offset=1168, connectorIds=[crm_data-sink_db_hh, crm_data-source, crm_data-sink_pandora], taskIds=[crm_data-source-0, crm_data-source-1, crm_data-source-2, crm_data-source-3, crm_data-sink_pandora-0, crm_data-sink_pandora-1, crm_data-sink_pandora-2, crm_data-sink_pandora-3, crm_data-sink_pandora-4, crm_data-sink_pandora-5]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connectors and tasks using config offset 1168 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connector crm_data-sink_db_hh (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connector crm_data-source (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting connector crm_data-sink_pandora (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2019-04-11 16:02:20,685] INFO Starting task crm_data-source-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
...
Starting connectors and tasks
...

更新。 12/04/2019

我提高了日志级别并重现了该问题。我看到很多不同任务的记录(已删除连接器的任务或尚未运行的任务),如下所示:

 [2019-04-12 15:14:32,360] DEBUG Storing new config for task crm_data-sink_hh-3 this will wait for a commit message before the new config will take effect. New config: {...} (org.apache.kafka.connect.storage.KafkaConfigBackingStore)

任务列表中有已删除连接器的任务 - 可以吗? Kafka Connect的内部主题也有同样的情况。

我的主要问题:如果没有任务因任何原因运行,为什么连接器没有失败?由于连接器在这种情况下实际上不起作用。


它看起来像是 Kafka Connect 本身的一个错误。有一个卡夫卡吉拉ticket关于这个问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka JDBC Sink 连接器:未分配任务 的相关文章

  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • 如何在 Spring Kafka 中以编程方式设置 Jsonserializer Type Value 方法

    所以我无法仅使用 yaml 为 JsonSerializer 配置 JavaType 方法 还不确定原因 但与此同时 我如何以编程方式设置它 我在文档中看到了它的代码 但是该代码到底需要在哪里运行 Spring Kafka JsonDese
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • Kafka 分区键无法正常工作

    我正在努力解决如何正确使用分区键机制的问题 我的逻辑是设置分区号为3 然后创建三个分区键为 0 1 2 然后使用分区键创建三个KeyedMessage 例如 KeyedMessage 主题 0 消息 KeyedMessage 主题 1 消息
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues

随机推荐

  • 所有子类的 C++ 模板专门化

    我需要创建一个像这样的模板函数 template
  • 如何在 Rails 中发现模型属性?

    我发现很难轻松地查看所有模型类中存在哪些属性 属性 因为它们没有在我的类文件中明确定义 为了发现模型属性 我保持 schema rb 文件打开 并在它和我根据需要编写的任何代码之间切换 这可以工作 但很笨重 因为我必须在读取架构文件以获取属
  • Google 地图 api-3:更改多边形的默认光标

    例如 我可以更改地图的draggableCursor 但即使我更改它 多边形的光标仍然是指针 因为地图位于多边形后面 我想将多边形的光标设置为 移动 以便明确多边形是可拖动的 更改多边形光标的正确方法是什么 有一个属性或方法可以做到这一点吗
  • 在 ASP.NET Core 中使用防伪 cookie,但使用非默认 CookieName

    我正在考虑更改 ASP NET Core 中默认防伪 cookie 的名称 我想更改 cookie 名称的原因是为了使 cookie 匿名化 在我看来 最终用户没有理由能够确定此 cookie 的责任 Microsoft AspNetCor
  • 如何使用 JSF2 处理多态性?

    我需要显示 编辑多态实体 我的抽象类是Person 我的具体课程是自然人 and 有道德的人 每个具体类都有自己的自定义属性 如何根据实体类使用适当的显示 编辑 复合 组件 谢谢 不存在这样的事情instanceof在EL 但是 您可以 a
  • NHibernate 通过代码映射 (Loquacious) - 级联选项

    我对使用 NHibernate 按代码映射时的级联枚举选项行为有疑问 枚举有以下选项 Flags public enum Cascade None 0 Persist 2 Refresh 4 Merge 8 Remove 16 Detach
  • 如何根据类别计数过滤数据框

    如何对数据帧进行子集化 以便仅包含包含其值在其他行中显示一定次数的列的行 例如 如果我有一个标记为 食物 的列 我将如何过滤掉在整个数据框中出现少于 5 次的食物的所有行 这是一个简单的例子 dat lt data frame x runi
  • v8 |手动启动垃圾收集器

    有没有办法在 Google V8 引擎上手动启动垃圾收集器 我找不到任何参考 通常GC都支持这个功能 你可以暴露v8 HEAP gt CollectAllGarbage函数到 通过命令标志的全局 JavaScript 命名空间 expose
  • Linux 系统的 OPEN_MAX 在哪里定义?

    OPEN MAX是定义单个程序允许的最大打开文件数的常量 According to Beginning Linux Programming 4th Edition Page 101 该限制通常由 limit h 中的常量 OPEN MAX
  • Laravel RoleMiddleware,未找到类角色

    我正在尝试添加一个简单的中间件来检查用户是否与角色匹配 当我使用中间件时遇到问题 出现异常 ReflectionException 类角色不存在 我不会尝试调用名为 role 的类 因此我认为这在 Laravel 的某个地方神奇地发生了 我
  • 我的 javascript webApp 首先读取一个短的 mp3 文件并在其中找到静音间隙 用于导航目的 然后播放相同的 mp3 文件 提示它从一个静音或另一个静音结束的位置开始 这与通常的 webAudio 场景不同 通常的 webAu
  • 无法在 PHP 中连接 2 个数组

    我最近学习了如何在 PHP 中使用 运算符连接 2 个数组 但考虑一下这段代码 array array Item 1 array array Item 2 var dump array 输出是 数组 1 0 gt 字符串 6 项目 1 为什
  • 单击时如何不突出显示 NSButton 的模板图像?

    我在 NSTableView 的每一行都有 NSButtons 按钮图像在 IB 中设置 并且带 Alpha 通道的黑色图标 窗口设置为暗模式 window appearance NSAppearance named NSAppearanc
  • 如何在 Twitter Rest API v1.1 中从 FHSTwitterEngine 完全注销?

    FHSTwitterEngine engine FHSTwitterEngine sharedEngine engine clearAccessToken 我尝试了上面的代码 但是当我尝试再次登录时 文本字段不会出现在presentModa
  • 是否有 Perl 模块或技术可以更轻松地使用长命名空间?

    有些命名空间又长又烦人 假设我下载了名为 FooFoo BarBar BazBaz tar gz 的假设包 它具有以下模块 FooFoo BarBar BazBaz Bill FooFoo BarBar BazBaz Bob FooFoo
  • 为什么即使不调用公共复制构造函数也需要它?

    拥有一个公共复制构造函数将使小程序 编译 但不显示副作用 复制 include
  • 水平线上的三角形指针/边框

    我正在尝试在水平线上创建三角形指针 边框 这是我想要实现的目标的示例 我尝试操纵 div 的顶部边框 但到目前为止我所做的一切根本不起作用 有多种方法可以实现这一点 这可能取决于您的布局 一种解决方案是使用两侧带有边框的旋转元素 trian
  • PublicKeyCredential 无法序列化

    我正在 Angular 应用程序中实现 FIDO2 WebAuthn 我已经获得了 PublicKeyCredentialCreationOptions 对象和 seccessfullt 寄存器 但打电话后 let response awa
  • VB.Net - Excel COM 对象未释放[重复]

    这个问题在这里已经有答案了 我面临的问题是 即使在调用 ReleaseComObject 和 GC Collect 方法后 Excel Process 仍保持活动状态 我的 Excel 进程终止 但仅在我关闭用户表单后才终止 下面是示例代码
  • Kafka JDBC Sink 连接器:未分配任务

    我尝试使用以下配置启动 JDBC 接收器连接器 name crm data sink hh config connector class io confluent connect jdbc JdbcSinkConnector tasks m