旁加载静态数据

2024-04-04

在 ParDo 中处理数据时,我需要使用存储在 Google Cloud Storage 上的 JSON 架构。我想这可能是侧面加载?我读了他们称之为文档的页面(https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html)并且它包含一些关于apache_beam.pvalue.AsSingleton and apache_beam.pvalue.AsSideInput但是如果我用谷歌搜索它们的用法,结果为零,并且找不到任何 Python 的示例。

如何从 ParDo 内的存储中读取文件?或者我是否在 ParDo 之前旁加载到我的管道,但如何在 ParDo 内利用第二个源?

[EDIT]

我的主要数据来自BQ:beam.io.Read(beam.io.BigQuerySource(...
侧面输入也来自BQ,使用相同的BigQuerySource.

当我在主数据侧输入其他数据之后添加一个步骤时,我收到一些奇怪的错误。我注意到当我这样做时beam.Map(lambda x: x)到侧面输入它可以工作。

侧面输入

schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
                         | beam.Map(lambda x: x)
                       )

主要数据

    source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))  

结合

validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))

我将使用您提到的文档作为库参考,并浏览 Beam 编程指南以获取更详细的演练:侧面输入部分 https://beam.apache.org/documentation/programming-guide/#side-inputs。我将尝试提供几个示例,其中我们将从公共表下载 BigQuery 架构并将其上传到 GCS:

bq show --schema bigquery-public-data:usa_names.usa_1910_current > schema.json
gsutil cp schema.json gs://$BUCKET

我们的数据将是一些没有标题的 csv 行,因此我们必须使用 GCS 模式:

data = [('NC', 'F', 2020, 'Hello', 3200),
        ('NC', 'F', 2020, 'World', 3180)]

使用侧面输入

我们将 JSON 文件读入schema电脑收藏:

schema = (p 
  | 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))

然后我们将它传递给ParDo作为侧面输入,以便将其广播给执行该命令的每个工作人员DoFn。在这种情况下,我们可以使用AsSingleton因为我们只想将模式提供为单个值:

(p
  | 'Create Events' >> beam.Create(data) \
  | 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
  | 'Log elements' >> beam.ParDo(LogElementsFn()))

现在我们可以访问schema in the process的方法EnrichElementsFn:

class EnrichElementsFn(beam.DoFn):
  """Zips data with schema stored in GCS"""
  def process(self, element, schema):
    field_names = [x['name'] for x in json.loads(schema)]
    yield zip(field_names, element)

请注意,最好进行模式处理(构造field_names),然后将其保存为单例以避免重复工作,但这只是一个说明性示例。


使用启动包

在这种情况下,我们不会将任何额外的输入传递给ParDo:

(p
  | 'Create Events' >> beam.Create(data) \
  | 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
  | 'Log elements' >> beam.ParDo(LogElementsFn()))

现在我们使用Python客户端库(我们需要安装google-cloud-storage)在每次工作人员初始化包时读取模式:

class EnrichElementsFn(beam.DoFn):
  """Zips data with schema stored in GCS"""
  def start_bundle(self):
    from google.cloud import storage

    client = storage.Client()
    blob = client.get_bucket(BUCKET).get_blob('schema.json')
    self.schema = blob.download_as_string()

  def process(self, element):
    field_names = [x['name'] for x in json.loads(self.schema)]
    yield zip(field_names, element)

两种情况下的输出是相同的:

INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'Hello'), (u'number', 3200)]
INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'World'), (u'number', 3180)]

使用 2.16.0 SDK 和DirectRunner.

两个示例的完整代码here https://gist.github.com/gxercavins/1b731afaa4be9b1c8784112e491d744b.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

旁加载静态数据 的相关文章

随机推荐