KSQL 流 - 从结构数组中获取数据

2023-12-23

我的 JSON 看起来像:

{
  "Obj1": {
    "a": "abc",
    "b": "def",
    "c": "ghi"
  },
  "ArrayObj": [
    {
      "key1": "1",
      "Key2": "2",
      "Key3": "3",

    },
    {
      "key1": "4",
      "Key2": "5",
      "Key3": "6",

    },
    {
      "key1": "7",
      "Key2": "8",
      "Key3": "9",

    }
  ]

}

我已经编写了 KSQL 流将其转换为 AVRO 并保存到主题,以便我可以将其推送到 JDBC Sink 连接器

CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e; 

在 Example_Avro 中,我只能获取数组中的第一个对象。

当我在 KSQL 中点击 select * from Example_Avro 时,如何获取如下所示的数据?

  a    b   key1   key2  key3

  abc  def   1       2     3
  abc  def   4       5     6
  abc  def   7       8     9

测试数据(我删除了后面无效的逗号key3价值):

ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }

Query:

SELECT OBJ1->A AS A, 
       OBJ1->B AS B, 
       EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
       EXPLODE(ARRAYOBJ)->KEY2 AS KEY2, 
       EXPLODE(ARRAYOBJ)->KEY3 AS KEY3 
FROM   TEST4 
EMIT CHANGES;

Result:

+-------+-------+------+-------+-------+
|A      |B      |KEY1  |KEY2   |KEY3   |
+-------+-------+------+-------+-------+
|abc    |def    |1     |2      |3      |
|abc    |def    |4     |5      |6      |
|abc    |def    |7     |8      |9      |

在 ksqlDB 0.6 上测试,其中EXPLODE添加了功能。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

KSQL 流 - 从结构数组中获取数据 的相关文章

  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z
  • 如何处理Kafka流中的不同时区?

    因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • Kafka 适合运行公共 API 吗?

    我有一个想要发布的事件流 它被划分为主题 不断更新 需要水平扩展 并且没有 SPOF 很好 并且可能需要在某些情况下重播旧事件 所有的功能似乎都与 Kafka 的功能相匹配 我想通过任何人都可以连接并获取事件的公共 API 将其发布到全世界
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • KafkaConsumer.commitAsync() 行为的偏移量比以前更低

    kafka 将如何处理调用 KafkaConsumer commitAsync Map
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从

随机推荐