我正在使用 Apache Beam 的 kafkaIO 来读取 Confluence 模式注册表中具有 avro 模式的主题。
我能够反序列化消息并写入文件。但最终我想写信给 BigQuery。我的管道无法推断架构。
如何提取/推断架构并将其附加到管道中的数据,以便我的下游进程(写入 BigQuery)可以推断架构?
下面是我使用架构注册表 url 来设置反序列化器以及从 Kafka 读取数据的代码:
consumerConfig.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
options.getSchemaRegistryUrl());
String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();
ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withConsumerConfigUpdates(consumerConfig)
.withValueDeserializer(valDeserializerProvider)
.withKeyDeserializer(ByteArrayDeserializer.class)
.commitOffsetsInFinalize()
.withoutMetadata()
);
我最初认为这足以让 Beam 推断模式,但事实并非如此,因为 hasSchema() 返回 false。
任何帮助,将不胜感激。
有正在进行的工作 https://github.com/apache/beam/pull/10978支持 Avro 模式的推断,存储在 Confluence Schema 注册表中,位于KafkaIO
。不过,现在也可以在用户管道代码中执行此操作。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)