如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 架构

2024-02-11

我正在使用 Apache Beam 的 kafkaIO 来读取 Confluence 模式注册表中具有 avro 模式的主题。 我能够反序列化消息并写入文件。但最终我想写信给 BigQuery。我的管道无法推断架构。 如何提取/推断架构并将其附加到管道中的数据,以便我的下游进程(写入 BigQuery)可以推断架构?

下面是我使用架构注册表 url 来设置反序列化器以及从 Kafka 读取数据的代码:

    consumerConfig.put(
                        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
                        options.getSchemaRegistryUrl());

String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();

ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
            ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);

pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers(options.getKafkaBrokers().get())
                        .withTopics(Utils.getListFromString(options.getKafkaTopics()))
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(valDeserializerProvider)
                        .withKeyDeserializer(ByteArrayDeserializer.class)

                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        );

我最初认为这足以让 Beam 推断模式,但事实并非如此,因为 hasSchema() 返回 false。

任何帮助,将不胜感激。


正在进行的工作 https://github.com/apache/beam/pull/10978支持 Avro 模式的推断,存储在 Confluence Schema 注册表中,位于KafkaIO。不过,现在也可以在用户管道代码中执行此操作。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 架构 的相关文章

  • 如何在Jenkins上更改工作空间并建立记录根目录?

    我希望将 Jenkins 的数据写入驱动器 E 因为这是服务器上的大型驱动器 Jenkins 本身安装在 C 上 我怎么做 我看到的默认配置是 工作区根目录 ITEM ROOTDIR 工作区 构建记录根目录 ITEM ROOTDIR 构建
  • 以 UTF8 而不是 UTF16 输出 DataTable XML

    我有一个 DataTable 我正在使用 WriteXML 创建一个 XML 文件 尽管我在以 UTF 16 编码导出它时遇到问题 并且似乎没有明显的方法来更改它 我了解 NET 在字符串内部使用 UTF 16 这是正确的吗 然后 我通过
  • 错误:模块“html”不提供视图引擎(Express)

    我正在尝试设置一个简单的路由应用程序 但在渲染页面时不断遇到错误 Error Module html does not provide a view engine 奇怪的是我已经在 app js 文件中指定了视图引擎 但仍然收到错误 app
  • Javascript/jQuery 外部高度()

    Does idOfLememt outerHeight 对所有浏览器产生相同的结果 IE7 有什么不同吗 只要去http api jquery com outerHeight http api jquery com outerHeight
  • 服务器响应 PASV 命令返回的地址与建立 FTP 连接的地址不同

    System Net WebException 服务器响应 PASV 命令返回的地址与建立 FTP 连接的地址不同 在 System Net FtpWebRequest CheckError 在 System Net FtpWebReque
  • Maven 构建错误 TOOLS.JAR NOT FOUND IN JRE

    我在构建 Maven 项目时遇到这个问题 请帮我解决 ERROR Failed to execute goal org apache maven plugins maven compiler plugin 2 5 1 compile def
  • Android 的代码覆盖率[重复]

    这个问题在这里已经有答案了 可能的重复 Android测试代码覆盖率 Eclipse https stackoverflow com questions 3282702 android test code coverage eclipse
  • 关闭扫描仪是否会影响性能

    我正在解决一个竞争问题 在问题中 我正在使用扫描仪获取用户输入 这是 2 个代码段 一个关闭扫描器 一个不关闭扫描器 关闭扫描仪 import java util Scanner public class JImSelection publ
  • UWP 应用程序在与商店关联后崩溃

    我正在为 Windows 创建一个 cordova 应用程序 将应用程序与商店关联后 应用程序起始页变为白色空白 如果应用程序使用包标识名称 com something moretext 则该应用程序可以正常工作 但我的商店包身份名称是 5
  • Maven2继承

    如果我有一个父 pom 并且想将其继承到多个项目 我通常通过添加到项目顶部来做到这一点
  • 纯旧 PHP 对象 (POPO) 一词的确切含义是什么?

    我想了解一下波波 我搜索了 popo 发现它代表 Plain Old Php Object 但我不确定 Plain Old Php Object 的确切含义 我想知道什么是 popo 以及在哪里使用它 谢谢 普通旧 在此处插入语言 对象是一
  • 如何用LoaderManager自动重新查询

    我有一个应用程序显示来自 SQLite DB 的数据 并且数据不断变化 所以显然 我认为我应该使用 LoaderManager 来显示数据 我读过一些关于将 LoaderManager 与 SQLite 结合使用的内容 然后看到了亚历克斯
  • JavaScript 中“键”的类型是什么?

    当我失去焦点并开始思考一个愚蠢的问题时 我遇到了这样的时刻 var a b value b 的类型是什么 我的意思不是 值 的类型 而是标记为 b 的实际键 背景 当我必须创建一个字符串键时 我开始想知道这一点 var a b value
  • RavenDB:为什么我会在此多重映射/归约索引中获得字段空值?

    受到 Ayende 文章的启发https ayende com blog 89089 ravendb multi maps reduce indexes https ayende com blog 89089 ravendb multi m
  • 如何在 Symfony 4 中为测试环境设置数据库

    我对如何在 symfony 4 中为测试环境设置数据库感到困惑 我曾经在配置测试 ymlsymfony 3 及以下版本中的文件 最佳做法是什么 我应该重新创建一个学说 yaml文件输入配置 包 测试 该文档提到如何通过编辑 phpunit
  • 尝试了解天蓝色云服务中的负载平衡

    我正在维护一个天蓝色的云服务 它有 1 个 Web 角色和几个辅助角色 该网络角色有多个实例 当我从资源中打开云服务时 我可以看到服务端点和公共IP地址 我想了解这个蔚蓝云服务中的流量负载是如何平衡的 我搜索了负载均衡器 但在订阅中找不到它
  • 将 read.csv 与符号链接文件一起使用

    我正在尝试做什么 我的源文件非常大 我想避免将其复制到其他文件夹中 我决定创建一个指向大文件的符号链接并想使用read csv读取文件 文件夹结构 项目1 数据 源文件 csv 项目2 数据 别名到源文件 csv 什么地方出了错 读取源文件
  • 为什么 FMA _mm256_fmadd_pd() 内在函数有 3 个 asm 助记符:“vfmadd132pd”、“231”和“213”?

    有人可以向我解释一下为什么融合乘法累加指令有 3 种变体 vfmadd132pd vfmadd231pd and vfmadd213pd 而只有一个 C 内在函数 mm256 fmadd pd 为了简单起见 在 AT T 语法中 有什么区别
  • 如何使用 C# 以低分辨率形式提供高分辨率图像

    尝试使用 300dpi tif 图像在网络上显示 目前 当用户上传图像时 我正在动态创建缩略图 如果创建的页面引用宽度为 500x500px 的高分辨率图像 我可以使用相同的功能即时转换为 gif jpg 吗 将创建的 jpg 的即将分辨率
  • GAE 无法部署到 App Engine

    我正在尝试从 Eclipse 发布 Web 应用程序 我在 GAE 上创建了四个项目 可以通过登录我的帐户并查看控制台来查看它们 我已经改变了appengine web xml到项目的应用程序 ID 如果我将其更改为 GAE 上第一个创建的

随机推荐