Kafka s3 连接“值不是结构类型”错误

2024-03-04

我使用以下参数加载 s3 连接器:

confluent load s3-sink
{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "s3_topic",
    "s3.region": "us-east-1",
    "s3.bucket.name": "some_bucket",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
    "schema.compatibility": "NONE",
    "partition.field.name": "f1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "name": "s3-sink"
  },
  "tasks": [
    {
      "connector": "s3-sink",
      "task": 0
    }
  ],
  "type": null
}

接下来我用 kafka-console- Producer JSON 发送它:

{"f1":"partition","data":"some data"}

我在连接日志中收到以下错误:

[2018-05-16 16:32:05,150] ERROR Value is not Struct type. (io.confluent.connect.storage.partitioner.FieldPartitioner:67)
[2018-05-16 16:32:05,150] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not re
cover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
io.confluent.connect.storage.errors.PartitionException: Error encoding partition.

我记得前一段时间它起作用了。
现在我使用 Confluence Open Source v.4.1


从 Confluence 4.1 版本开始FieldPartitioner 不支持 JSON https://github.com/confluentinc/kafka-connect-storage-common/issues/32场提取。

你可以改为使用kafka-avro-console-producer https://docs.confluent.io/3.3.1/quickstart.html使用 Avro Schema 发送相同的 JSON blob,那么它应该可以工作

这是您要使用的属性

--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"data","type":"string"}]}'

然后你就可以发送

{"f1":"partition","data":"some data"}

您需要在 Connect 中使用这些属性

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka s3 连接“值不是结构类型”错误 的相关文章

  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • Kafka 分区键无法正常工作

    我正在努力解决如何正确使用分区键机制的问题 我的逻辑是设置分区号为3 然后创建三个分区键为 0 1 2 然后使用分区键创建三个KeyedMessage 例如 KeyedMessage 主题 0 消息 KeyedMessage 主题 1 消息
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • kafka ProducerRecord 和 KeyedMessage 有什么区别

    我正在衡量卡夫卡生产者生产者的表现 目前我遇到了两个配置和用法略有不同的客户 Common def buildKafkaConfig hosts String port Int Properties val props new Proper
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk
  • Confluence 平台与 apache kafka [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是 kafka 的新手 对 Confluence 平台很好奇 看来Confluence平台上的用户故事并不多 Confluence平台和Apa
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late

随机推荐

  • 多种差异工具

    我已经设置了我的git使用 P4Merge 作为 diff 工具 如上所述here http progit org book ch7 1 html So git diff将触发 P4Merge 然而 有时我发现使用 UNIXdiff更快 更
  • 如何在postgis中找到多边形内的所有点?

    我将位置存储在 location table point location 几何 中 现在我在谷歌地图上绘制一个多边形并将该多边形 几何 传递到后端 我想找到该多边形内的所有位置 SELECT POINT LOCATION FROM LOC
  • 如何找到对“已知”列表进行排序的最佳堆栈移动集?

    关于未知列表的排序问题 人们已经了解很多 但是寻找最优排序的问题呢 已知名单在堆栈机中 也就是说 假设您有以下堆栈机 4 1 3 2 即有3堆空间 其中1堆填满了数字 此外 假设您的堆栈机可以执行 2 个动作 move a b 放置顶部元素
  • 在 Symfony2 中向删除表单添加“确认选项”的最佳方法是什么?

    如果您使用控制台为 Symfony2 中的实体创建 CRUD 代码 您最终将得到一个非常基本的删除功能 此功能简洁高效 但不提供 您确定吗 确认 如果要删除的实体存在 则立即删除 有人对添加用户确认的最简单方法有建议吗 到目前为止我一直在使
  • 您如何搜索/访问用户数据?

    我想提醒正在注册的用户 他们选择的电子邮件地址已在我们的用户库中 在他们注册之前 同时 如何将他们 所需的 电子邮件与我的用户列表中的用户进行比较 用户到底存储在哪里以及如何访问该数据节点 您应该有一个用户节点 其中包含您的用户以及有关他们
  • 在 silverlight 中使用 MVVM 模式设置可见性

    我在银光下取了一个网格 最初 textbox2 是不可见的 当我单击 textbox1 时 我们必须看到 textbox2 我尝试如下
  • 如何使用 PrimeFaces 验证码?

    我经历了用户指南 http www primefaces org documentation html和showcase http www primefaces org showcase ui captcha jsf但找不到在支持 bean
  • RxSwift:立即交付第一个项目,对后续项目进行反跳

    我有一个文本字段需要验证 我想在用户键入时禁用按钮 用户停止输入后 1 秒去抖 将执行验证并根据结果有条件地启用按钮 请注意当用户仅键入一个字符时的极端情况 验证仍然应该发生 a ab abc ab a ab false validate
  • 在camerax中捕获没有音频的视频

    我想使用camerax api在应用内相机中捕获没有音频的视频 我使用的是beta10的camerax api 我尝试过设置音频缓冲区大小和音频源等 截至此答案尚不支持 但当使用camerax库完全发布视频录制时 这将是可能的 最好的方法是
  • 如何对星期几进行直方图并具有字符串标签

    我有一个日期数据框 日期对象 见底部 我试图将它们转换为星期几 然后绘制直方图 但理想情况下标签是 星期一 星期日 不是数字 我有两个不同的问题 这很容易将日期对象转换为星期几 https stat ethz ch pipermail r
  • Rails 服务器错误:Ruby 版本是 1.8.7,但您的 Gemfile 指定为 1.9.3

    我输入了现有的 ruby 应用程序 然后输入 导轨 想在这里启动rails服务器 但它说 您的 Ruby 版本是 1 8 7 但您的 Gemfile 指定为 1 9 3 事实上 我有一个1 8 7 但我把它删除了 如果我这样做 红宝石 v
  • 我想在matlab中计算两行的平均值

    我在 matlab 中有一个 1028 x 18 矩阵 我想在 Matlab 中计算第一行和第二行按列值 第三行和第四行等的平均值 并得到一个具有平均值的新矩阵 我想你想计算每对行的列平均值 将数组重塑为 2 x 18 1028 2 计算平
  • RenderScript 支持库在 x86 设备上崩溃

    我正在运行致命异常android support v8 renderscript 在 Razor i x86 设备 上 如果我使用 问题就会消失android renderscript ARM设备也没有问题 这是例外情况 03 03 18
  • Spyder IDE 中的重复日志条目和锁定日志文件

    我想要的 我的 python 脚本运行 将日志消息输出到控制台和文件 一旦 python 脚本完成运行 我希望能够删除 编辑日志文件 我在 Windows7 上使用 Spyder IDE 示例代码 import logging loggin
  • Paypal 沙盒帐户电子邮件确认

    我在开发者网站上创建了一个沙箱帐户 当帐户所在国家 地区不在列表中时 一切都很好 然后 我通过 创建测试帐户 网站中的链接创建了另一个帐户 该帐户所在的国家 地区不在之前的列表中 它的电子邮件未经确认 我找不到激活它的方法 没有它我就无法接
  • 从 Dropzone 中删除任何现有文件会显示 dictDefaultMessage

    我创建了一个显示服务器上现有文件的放置区 我添加了有效的删除链接 我的问题是 当我使用删除链接删除文件时 默认的 将文件拖到此处上传 消息会出现在拖放区中 即使仍然有缩略图 我已经关注了这个tutorial http www startut
  • 自定义验证属性:比较同一模型中的两个属性

    有没有一种方法可以在 ASP NET Core 中创建自定义属性来验证一个日期属性是否小于模型中的其他日期属性ValidationAttribute 可以说我有这个 public class MyViewModel Required Com
  • quartz.NET 的任何开源管理 UI

    是否有任何开源管理界面可以在 QUARTZ NET 调度程序中添加 编辑 删除作业和触发器 看看这个blog http bugsquash blogspot com 2010 06 embeddable quartznet web cons
  • Magento:SQLSTATE [23000]:违反完整性约束:1062 键“UNQ_SALES_FLAT_ORDER_IN”的重复条目“100000001”

    我已经安装了 Magento 1 9 0 1 并且已经上线 1 个月了 客户的第一个订单没有出现任何问题 但现在 当应该处理订单时 会出现以下错误消息 处理您的订单时出错 请联系我们或稍后重试 日志文件说 异常 PDOException 消
  • Kafka s3 连接“值不是结构类型”错误

    我使用以下参数加载 s3 连接器 confluent load s3 sink name s3 sink config connector class io confluent connect s3 S3SinkConnector task