如何将 PCollection 转换为 python 数据流中的列表

2023-11-30

我有一个 PC 合集P1包含 ID 字段。我想从 PCollection 中获取完整的 ID 列作为列表,并将该值传递给 BigQuery 查询以过滤一个 BigQuery 表。

执行此操作最快且最优化的方法是什么?

我是数据流和大数据的新手。任何人都可以对此提供一些提示吗?

Thanks!


根据我从你的问题中了解到的,你想要根据你拥有的 ID 构建 SQL 语句P1。这是如何实现这一目标的一个示例:

sql = """select ID from `table` WHERE ID IN ({})"""
with beam.Pipeline(options=StandardOptions()) as p:
         (p | 'Create' >> beam.Create(['1', '2', '3']) 
            | 'Combine' >> beam.combiners.ToList()
            | 'Build SQL' >> beam.Map(lambda x: sql.format(','.join(map(lambda x: '"' + x + '"', x))))
            | 'Save' >> beam.io.WriteToText('results.csv'))

Results:

select ID from `table` WHERE ID IN ("1","2","3")

操作beam.combiners.ToList()将整个 PCollection 数据转换为单个列表(我稍后将其用于注入 SQL 占位符)。

您现在可以使用文件中的 SQLresults.csv-00000-to-000001针对 BQ 运行此查询。

我不确定是否可以直接在 PCollection 中运行此查询(类似于(p | all transformations | beam.io.Write(beam.io.BigQuerySink(result sql)))。我想从最终结果文件中读取然后针对 BQ 发出查询将是这里的最佳方法。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 PCollection 转换为 python 数据流中的列表 的相关文章

随机推荐