从 JSON 到 Avro 的 Kafka 流

2024-01-02

我尝试使用 Kafka Stream 将带有 String/JSON 消息的主题转换为另一个主题(作为 Avro 消息)。

流主要方法:

    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 

    final KStreamBuilder builder = new KStreamBuilder();


    final Serde<String> stringSerde = Serdes.String();

    builder.stream(stringSerde, stringSerde, "testin")
            .mapValues(value -> AvroUtil.transform(value))
            .to("testout");

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams.start();

转型:

public static GenericRecord transform(Object value) {

    // ... parse string/json and generate Avro object

    String userSchema = "{\"type\":\"record\"," +
            "\"name\":\"myrecord\"," +
            "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(userSchema);
    GenericRecord avroRecord = new GenericData.Record(schema);
    avroRecord.put("f1", "value1");

    return avroRecord;
}

并得到这样的异常:

Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: com.fasterxml.jackson.annotation.JsonProperty.access()Lcom/fasterxml/jackson/annotation/JsonProperty$Access;
at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findPropertyAccess(JacksonAnnotationIntrospector.java:229)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:545)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.fromMemberAnnotationsExcept(POJOPropertyBuilder.java:996)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.findAccess(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.removeNonVisible(POJOPropertyBuilder.java:623)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._removeUnwantedAccessor(POJOPropertiesCollector.java:697)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:298)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:222)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:355)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest.toJson(RegisterSchemaRequest.java:76)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:224)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:219)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:58)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:90)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)

这是正确的方法吗?我是 Kafka Streams 和 Avro 的新手


只是缺少 Jackson 依赖项:

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.7.8</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.7.8</version>
    </dependency>

现在可以了

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 JSON 到 Avro 的 Kafka 流 的相关文章

  • 在Java Servlet中获取通过jquery ajax发送的参数[重复]

    这个问题在这里已经有答案了 我在网上搜索这个主题 但找不到有效的示例 我会很高兴有人能给我帮助 这就是我测试的 ajax url GetJson type POST dataType json contentType application
  • CURL 相当于使用 VBA 的 POST JSON 数据

    我知道这与之前提出的一些问题类似 但有些东西仍然对我不起作用 如何执行以下命令 curl X POST data statements json H Content Type application json user username p
  • 如何使用 PHP 通过 JSON 发送 HTML 元素?

    以下功能 try query this gt pdo gt prepare SELECT FROM bookings WHERE TourID AND dTourDate and Status NOT LIKE Cancelled quer
  • 如何对数组的数组进行 JSON_MODIFY?

    我的结构看起来像这样 Declare layout NVARCHAR MAX N Sections SectionName Section1 SectionOrder 1 Renders RenderName Render1 RenderO
  • Web API 2.0 使用 pascalcase 模型接收驼峰式命名的 JSON 数据

    我正在尝试对我的 Web API 进行 PUT 调用 我在 WebApiConfig cs 中设置了以下内容 以处理以驼峰形式将数据发送回我的 Web 项目 config Formatters JsonFormatter Serialize
  • 我如何浏览 json?

    我有一些在对象中的 JSON 但我似乎可以返回 json 示例的值 如下所示 rootLayout main layoutDescriptions id main container type Tabs content type Panel
  • 从 Json 纯 JavaScript 创建表

    我有一个带有多个可以更改的键的 Json 如下所示 Var children num 6 name me phone 7 num 8 name him phone 9 我想要一个带有标题的表格 号码 姓名 电话 我怎样才能只用 JavaSc
  • “JSONArray 文本必须在 null 的第 1 个字符处以 '[' 开头”

    只是想知道这个错误可能意味着什么 我从下面的代码中得到它 try JSONArray jArray new JSONArray result for int i 0 i
  • Apache Camel 的 JsonMappingException

    我在骆驼路线上遇到以下异常 Caused by com fasterxml jackson databind JsonMappingException No serializer found for class org apache cam
  • Elm:如何从 JSON API 解码数据

    我有这个数据使用http jsonapi org http jsonapi org format data type prospect id 1 attributes provider user id 1 provider facebook
  • 使用 System.Text.Json 序列化记录成员

    我在记录中使用自我引用成员 如下所示 type Payload Id Guid member x DerivedProperty Derived Property using id x Id NewtonSoft Json会序列化这个 但是
  • JSON 转换带有整数键的 Map

    我有一个测试代码的小样本 我尝试将 Map 转换为 JSON 字符串并返回 在解析 JSON 字符串时 结果映射包含字符串键 1 而不是整数键 1 从而导致测试失败 用作此映射的键的 POJO 也会发生同样的情况 这是预期行为还是我省略了
  • Bug 组合:jQuery 1.4、ajax/json、Firebug Lite 和 IE 8

    我刚刚得出结论 无论我如何尝试 jQuery 的 ajax 调用都无法在 IE 8 中处理 JSON 数据 我发现我可以使用 jQuery 1 3 2 库 这解决了问题 但 1 4 根本无法处理 JSON ajax 请求 即使返回的 JSO
  • 解码Json数据数组并插入到mysql

    这个问题可能已经在这里问过 但我尝试搜索找不到它 我有如下 Json 数据 CityInfo CityCode 5599 Name DRUSKININKAI CityCode 2003 Name KAUNAS CityCode 2573 N
  • 如何绕过 CF8 编码不可打印字符中的 SerializeJSON?

    SerializeJSON 创建具有不可打印字符 即 ASCII 21 的 JSON 这是无效的 JSON 我该如何解决这个问题 删除不可打印字符的正则表达式会起作用吗 什么正则表达式会删除不可打印的字符 嗯 这个简单的解决方案是为 cff
  • 在 Node.js 中创建 JSON 数组

    我需要在用 Node js 编写的服务器中创建一个 JSON 字符串 以便在请求时发送到客户端 问题是这个 JSON 取决于服务器中的可用数据 因此 JSON 数组的大小并不总是相同 我已经尝试了一整天 但尽管我感觉很接近 但我仍然不明白
  • 反序列化动态 JSON 文件 C# NewtonSoft.JSON

    正在反序列化一个动态 JSON 文件 该文件可能包含 2 个单独的类 我不知道数组中将包含哪种类型的数据 问题是 我将根对象反序列化为 Base 类型 subtests 对象被反序列化为 Subtest 但 subtests 数组可能是 B
  • Elasticsearch GET API 获取分片大小

    在 Elasticsearch 2 3 3 中 有没有办法使用返回 JSON 的 GET API 获取分片大小 目前我找到了以下几种获取shard size的方法 这两种方法都存在问题 recovery gt 使用 JSON 进行响应并提供
  • Json.dump 失败并显示“必须是 unicode,而不是 str”TypeError

    我有一个 json 文件 其中恰好有大量中文和日文 以及其他语言 字符 我将其加载到我的 python 2 7 脚本中使用io open如下 with io open multiIdName json encoding utf 8 as j
  • Rspec 控制器测试,传递 JSON 参数

    我试图实现以下目标 在 RSpec 控制器测试中创建 POST json 请求 并向其传递参数 这是我的代码 it returns access token do post login email bla password bla1 for

随机推荐

  • EF6 急切加载相关实体的单个属性

    在 EF6 中 我有一个实体客户 具有实体地址的导航属性 地址实体包含属性 城市 我可以急切地加载地址实体 同时获取所有客户 如下所示 dbSet Customers Include customer gt customer Address
  • 从 HttpWebRequest 下载时缺少某些元素?

    我正在使用 httpwebrequest 从给定的 url 下载数据 但很少有元素没有响应 Dim Request As HttpWebRequest CType WebRequest Create https www royalmail
  • 如何使InnoDB表在服务器重启时不重置自动增量?

    我的开发机器上运行的是 MySQL 5 5 37 我使用 innodb 表 面临下一个问题 服务器重新启动后自动增量重置 Found 自动锁模式 http dev mysql com doc refman 5 1 en innodb par
  • Twitter Open Graph 图像未显示

    我正在尝试在网站中实现 OpenGraph 这是我的元数据代码
  • Laravel 方法分页不存在

    我正在尝试对模型结果进行分页 但收到 分页方法不存在 这是我的代码 user dispatches Dispatch all gt where user id Auth id gt paginate 10 我需要获取用户 ID 等于当前经过
  • 我可以在 Three.js 中隐藏网格的面吗?

    我想使网格的某些部分在运行时不可见 我可以将这些部分设置为不可见 透明吗 通过改变单个面孔的属性 网格本身仅使用一种材料 示例说明 as the editor理解这个问题 想象一个网格 这里有 20 个顶点的几何图形 其中每个四个顶点的四边
  • DSPack - 如何获取声音输出的默认设备?

    在 Windows 7 中 有多个播放设备 示例 在我的笔记本电脑上 扬声器和双耳机 独立双耳机 SPDIF 通过 HP Dock 进行数字输出 情况是这样的 我正在编写一个应用程序 让用户选择输出设备并将其保存到应用程序的设置中 因此 它
  • Java 相当于 Cocoa 委托/Objective-C 非正式协议?

    Java 中 Cocoa 委托的等价物是什么 我知道我可以将一个接口传递给一个类 并让该类调用适当的方法 但我想知道是否有其他方法可以实现更接近 Cocoa Objective C 的非正式协议的东西 简而言之 Java 中没有任何东西能像
  • 如何在 Visual Studio 2013 中创建数据库项目

    我正在将包含一些 C 项目的解决方案从 Visual Studio 2008 迁移到 VS2013 迁移进行得很顺利 进行了一些调整 但是还有一个 dbp 项目 数据库项目 来自 VS2008 它拒绝在 VS2013 中迁移 加载 我在这个
  • 为什么通过 newCachedThreadPool 创建的 ExecutorService 是邪恶的?

    保罗 泰玛推介会 http paultyma blogspot com 2008 03 writing java multithreaded servers html有这一行 Executors newCachedThreadPool万恶
  • 在 OpenCV 中查找轮廓的“fitLine”

    我是一个在流中查找轮廓的程序 例如 我想找到可以描述这个轮廓的 点集 就像红线一样 黄色部分是轮廓的矩 我尝试使用fitLineopencv 的函数 但结果是无意义的 任何想法如何获得轮廓的中线 这应该代表我的 Contours 的关键方面
  • 有没有一种方法可以为 sendfile 编写不直接使用系统调用指令的 shellcode?

    我正在处理一个类似 ctf 的挑战 它正在过滤我的 shellcode 以确保我没有分别具有 syscall sysenter 和 int 指令 0x0f05 0x0f34 和 0x80cd 的十六进制值编码 基本上我有一个可以打开文件的
  • 哈希码的理解

    哈希函数对于实现哈希表很重要 我知道在java中 对象有其哈希码 该哈希码可能是由弱哈希函数生成的 以下是 补充哈希函数 的一个片段 static int hash Object x int h x hashCode h h lt lt 9
  • 使用 Node.js 提供网页时如何摆脱 .html 扩展名?

    我是 Node js 的初学者 正在使用 Express 和 ejs 布局 我想知道如何在发布页面时摆脱 html 扩展名 例如 如果我转到我的 localhost 3000 about html 这可以工作 但我希望它显示为 about
  • 无需 ctrl+空格即可自动完成

    Every time I must use CTRL SPACE for get complete variants But I need it often I need list of variants every time when I
  • 在 iOS 中将项目添加到阅读列表

    是否可以从应用程序将项目 url 添加到 Safari 中的阅读列表 我环顾四周 但只能找到信息如何手动添加 http mac tutsplus com tutorials tips shortcuts quick tip how to u
  • MVC API 控制器的复杂对象未通过 jquery ajax 调用填充

    我正在尝试调用 POST API 控制器 控制器被调用 但复杂对象是空的 我已经运行了 Fiddler 该对象甚至已经填充在那里 我究竟做错了什么 我的 C 对象 createdUser已包含 Web Api 所需的正确格式的数据 无需将其
  • 为什么我们使用完 System.out Stream 后不关闭它呢?

    我只是想知道 我们通常会在最后关闭流 但为什么我们不关闭System out打印流与System out close 如果关闭它 您将无法再写入控制台 因此当进程终止时 让我们将此任务留给虚拟机 您应该只关闭您拥有或手动创建的流 Syste
  • 在 IntelliJ IDEA 中为 scala 项目附加源

    我有一个带有 Scala 的 Playframework 2 项目 非常小的一个 它用斯卡拉异常 https github com playframework Play20 wiki ScalaAnorm图书馆 我有这样的代码 packag
  • 从 JSON 到 Avro 的 Kafka 流

    我尝试使用 Kafka Stream 将带有 String JSON 消息的主题转换为另一个主题 作为 Avro 消息 流主要方法 streamsConfiguration put StreamsConfig KEY SERDE CLASS