Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题

2023-12-09

我有一个主题,最终会有很多不同的模式。目前它只有一个。 我已经通过 REST 创建了一个连接作业,如下所示:

{
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}

然后,我使用字符串键和 avro 序列化负载向该主题推送一条消息。如果我在控制中心检查该主题,我会看到正确的反序列化数据。 查看连接实例的输出,尽管我在日志中看到了这一点

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

从这里你可以看到有两个相关的问题:

  • Error retrieving Avro schema for id 7
  • Subject not found.; error code: 40401

让我烦恼的是,我已将策略指定为 RecordNameStrategy,我认为应该使用魔术字节来获取模式而不是主题名称,但它在未找到主题上出错。我不确定它实际上是在寻找主题名称还是通过 ID 获取架构。 无论哪种方式,都可以通过 ssh-ing 到连接实例并执行卷曲来http://schema-registry-service:8081/schemas/ids/7我确实得到了返回的架构。 此堆栈跟踪上方有一些额外的日志记录,令人失望的是它看起来仍然使用错误的名称策略:

INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

有谁知道如何解决这个问题?我正在使用以下图像:

  • confluenceinc/cp-kafka-connect:5.2.0
  • confluenceinc/cp-kafka:5.1.0

Thanks


在踪迹中,lookUpSubjectVersion意味着它试图在下面进行查找/subjects/:name/versions对于那里列出的每个 ID,然后找不到schemaId=7 (Note: notversion=7),虽然从日志中不太清楚什么:name它试图在这里使用,但如果没有找到,那么你会得到你的Subject not found错误。如果我的PR被接受了,主题名称会更清楚

我相信这可能是由于使用RecordNameStrategy. 查看该房产的 PR,我认为它实际上只是针对生产者/消费者代码进行了测试,而不是在 Connect API 中进行了彻底的测试。与默认行为相比TopicNameStrategy

其中,你可以看到它尝试使用

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

但仔细一看,我认为你可能配置错误。

类似于你的方式value.converter.schema.registry.url,你实际上需要设置value.converter.value.subject.name.strategy反而。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题 的相关文章

  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 使用kafka lib反序列化PRIMITIVE AVRO KEY

    我目前无能力反序列化 avro PRIMITIVE 密钥在 KSTREAM 应用程序中 使用 avro 模式编码的密钥 在模式注册表中注册 当我使用 kafka avro console consumer 时 我可以看到密钥已正确反序列化
  • Kafka 中的“__consumer_offsets”主题是什么

    当我运行此命令时 我得到 2 个主题 我知道我创建了测试主题 但我看到了一个名为 consumer offsets 的附加主题 从名称上看 它与消费者抵消有关 但它是如何使用的呢 bin kafka topics sh list zooke
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 带有 kafka-avro-console-consumer 的未知魔法字节

    我一直在尝试将 Confluence 中的 kafka avro console consumer 连接到我们的旧版 Kafka 集群 该集群是在没有 Confluence Schema Registry 的情况下部署的 我使用以下属性显式
  • 如何处理Kafka流中的不同时区?

    因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
  • Avro 架构和生成的文件中的十进制数据类型支持

    这个问题涉及 Avro 版本 1 8 1 我们的 AVRO 模式中有以下字段 name sale price type bytes null logicalType decimal precision 18 scale 17 如您所见 该字
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区

随机推荐

  • 在 PHP 中调整图像大小的智能方法[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 我想知道是否有人可以帮助我
  • 有这样的RTSP Ping吗?

    我目前正在开发一个 WinForm 应用程序 使用 C 中的 RTSP 协议从 IP 摄像机流式传输视频 一切都很好 该应用程序的部分要求包括检查网络摄像机是否在线的功能 因此 我使用 System Net NetworkInformati
  • 《Head First Design Patterns》一书中的接口与接口关联

    这本书首先设计模式将以下 UML 作为观察者模式的示例 这张图中让我印象深刻的是之间的关联关系Subject and Observer接口 据我了解Java接口 它们不能以这种方式实现 Has a 关系 当我查看几页后提供的实现示例时 我发
  • 如何在Python中对嵌套列表的外部和内部子列表进行排序?

    首先 如果这太天真 我深表歉意 我是初学者 我有以下类型的列表列表 我想首先按内部列表的最后一个成员按升序排序 data 1 45 0 2 49 2 3 98 0 4 82 1 5 77 1 6 98 2 我通过使用以下方法来实现此目的 s
  • 具有不同内容的跨路由的通用组件

    我有一个名为Header它存在于所有路线中 而应用程序的其余部分则发生变化 为了实现这一点 我的主要渲染代码如下所示 使用 ES6 render return div div
  • 如何检查用户是否登录

    我创建了一个登录页面 用户必须提供用户名和密码才能访问某些特定资源 他们可以在其中上传图像 或者只是编辑一些有关自己的描述 我的 web config 文件如下所示
  • php:获取ip地址

    我想获取访客的IP地址 你能告诉我什么元素吗 SERVER 我应该使用 SERVER HTTP CLIENT IP SERVER HTTP X FORWARDED FOR or SERVER REMOTE ADDR UPDATE 如果您的客
  • Xcode 4 中的这些图标代表什么?

    我以前从未见过这些 但是文件浏览器中文件名旁边的小 A 和 M 是做什么用的 让我根据SVN的知识猜测一下 A gt 新添加的文件 M gt 修改现有文件
  • 如何在 Titanium JS 中创建带有按钮的标题栏?

    我在用着钛合金构建一个应用程序 我尝试创建一个带有按钮的标题栏 类似于联系人应用程序 如下图所示 该标题的标题位于中间 按钮位于任一站点 我一直在到处寻找一种在钛中做到这一点的方法 但我还没有找到任何东西 文档中似乎没有这个内容 我需要创建
  • 如何在配置单元中保留驼峰式大小写的列名

    选择 12345 作为 EmpId 输出是 empid 值为 12345 有任何线索可以保持与 EmpId 相同的列名吗 不可能 这是 HIVE 元存储的限制 它以全小写形式存储表的模式 Hive 使用此方法来标准化列名称 请参阅表 jav
  • 内部访问修饰符与私有访问修饰符

    两者有什么区别internal and privateC 中的访问修饰符 internal适用于程序集范围 即只能从同一 exe 或 dll 中的代码访问 private适用于类范围 即只能从同一类中的代码访问
  • 为什么char数据的地址不显示?

    class Address int i char b string c public void showMap void void Address showMap void cout lt lt address of int lt lt i
  • 没有 Web 服务器的 Spring Boot

    我有一个简单的 Spring Boot 应用程序 它从 JMS 队列获取消息并将一些数据保存到日志文件中 但不需要 Web 服务器 有没有办法在没有Web服务器的情况下启动Spring Boot 春季启动 2 x 3 x 应用程序属性 sp
  • Laravel 4 不刷新

    我在 laravel 4 中遇到一个奇怪的问题 因为每次我尝试刷新页面时都不会出现更改 肯定不是浏览器的缓存 任何帮助表示赞赏 我遇到了同样的问题并找到了答案 尝试在 php ini 中禁用 OPcache 如果您使用MAMP 可以在 Ap
  • 隐藏已编译应用程序可执行代码的实践

    反编译和逆向工程 net 程序集是一种标准做法 我想发布一些将添加到现有应用程序的插件程序集 但我不希望它们被其他人使用 有哪些方法可以隐藏这些程序集的来源 除非控制目标硬件 否则理论上不可能实现 100 的保护 如果 CPU 能够执行它
  • 咖啡 | solver.prototxt值设置策略

    在 Caffe 上 我正在尝试实现一个用于语义分割的全卷积网络 我想知道是否有一个具体的策略来设置你的 solver prototxt 以下超参数的值 测试迭代器 测试间隔 迭代大小 max iter 这是否取决于您的训练集的图像数量 如果
  • c语言中绝对值的写法

    我知道该解决方案很丑陋并且在技术上不正确 但我不明白为什么代码不起作用 include
  • 包括 ACL 条件下的功能

    我有一个名为 MedicalFile 的资产 其中包含对组织的引用 参与者 HealthCareProfessional 也属于一个组织 现在我想定义一个 ACL 规则 限制医疗保健专业人员只能查看 MedicalFile 与其组织连接的医
  • Java 中 JESS 的输出

    我想将 事实 发送到java中的JESS文件并获取结果 我基本上对 JESS 文件进行批处理 然后通过 add 将我的数据 此处的结构 发送到引擎中 我试图将 JESS 结果 应该是一个字符串 转换为 值 Rete engine new R
  • Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题

    我有一个主题 最终会有很多不同的模式 目前它只有一个 我已经通过 REST 创建了一个连接作业 如下所示 name com mycompany sinks GcsSinkConnector auth2 config connector cl