我将使用您提到的文档作为库参考,并浏览 Beam 编程指南以获取更详细的演练:侧面输入部分 https://beam.apache.org/documentation/programming-guide/#side-inputs。我将尝试提供几个示例,其中我们将从公共表下载 BigQuery 架构并将其上传到 GCS:
bq show --schema bigquery-public-data:usa_names.usa_1910_current > schema.json
gsutil cp schema.json gs://$BUCKET
我们的数据将是一些没有标题的 csv 行,因此我们必须使用 GCS 模式:
data = [('NC', 'F', 2020, 'Hello', 3200),
('NC', 'F', 2020, 'World', 3180)]
使用侧面输入
我们将 JSON 文件读入schema
电脑收藏:
schema = (p
| 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))
然后我们将它传递给ParDo
作为侧面输入,以便将其广播给执行该命令的每个工作人员DoFn
。在这种情况下,我们可以使用AsSingleton
因为我们只想将模式提供为单个值:
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
现在我们可以访问schema
in the process
的方法EnrichElementsFn
:
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def process(self, element, schema):
field_names = [x['name'] for x in json.loads(schema)]
yield zip(field_names, element)
请注意,最好进行模式处理(构造field_names
),然后将其保存为单例以避免重复工作,但这只是一个说明性示例。
使用启动包
在这种情况下,我们不会将任何额外的输入传递给ParDo
:
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
现在我们使用Python客户端库(我们需要安装google-cloud-storage
)在每次工作人员初始化包时读取模式:
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def start_bundle(self):
from google.cloud import storage
client = storage.Client()
blob = client.get_bucket(BUCKET).get_blob('schema.json')
self.schema = blob.download_as_string()
def process(self, element):
field_names = [x['name'] for x in json.loads(self.schema)]
yield zip(field_names, element)
两种情况下的输出是相同的:
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'Hello'), (u'number', 3200)]
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'World'), (u'number', 3180)]
使用 2.16.0 SDK 和DirectRunner
.
两个示例的完整代码here https://gist.github.com/gxercavins/1b731afaa4be9b1c8784112e491d744b.