我正在开发一个从 Google Cloud Storage (GCS) 目录读取约 500 万个文件的管道。我已将其配置为在 Google Cloud Dataflow 上运行。
问题是,当我启动管道时,需要几个小时“计算所有文件的大小”:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
正如您所看到的,计算大约 550 万个文件的大小花了一个半小时(5549 秒),然后又从头开始!又花了2个小时跑了第二遍,然后又开始了第三遍!截至撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这使我相信这一切都发生在我的本地计算机上,并且没有利用任何分布式计算。
当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
按照这个速度,仅对所有 550 万个文件执行 GCS 大小估计 4 次就需要大约 8 小时,所有这些都是在 Dataflow 作业开始之前进行的。
我的管道配置为--runner=DataflowRunner
选项,因此它应该在数据流中运行:
python bigquery_import.py --runner=DataflowRunner #other options...
管道从 GCS 读取数据如下:
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
参考bigquery_import.py https://github.com/rviscomi/bigquery/blob/8ac58f72a2367305d080e406e81ef376db8a90f7/dataflow/python/bigquery_import.py#L208-L212在 GitHub 上获取完整代码。
我很困惑为什么这个繁琐的过程发生在数据流环境之外以及为什么需要多次完成。我是否正确地从 GCS 读取文件,或者是否有更有效的方法?