如何使用多个工作人员加速批量导入谷歌云数据存储?

2023-12-23

我有一个基于 apache-beam 的数据流作业可以使用VCF源 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/vcfio.py从单个文本文件(存储在谷歌云存储中),将文本行转换为数据存储Entities并将它们写入数据存储接收器 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py。工作流程工作正常,但我注意到的缺点是:

  • 数据存储的写入速度最多约为每秒 25-30 个实体。
  • 我尝试使用--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100但执行似乎更喜欢一个工作人员(见下图:目标工作人员曾经增加到 2 个,但“基于当前运行步骤中并行工作的能力”减少到 1 个)。

我没有使用祖先路径作为键;所有实体都是相同的kind.

管道代码如下所示:

def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
  (p
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

因为我有数百万行要写入数据存储,所以以 30 个实体/秒的速度写入会花费太长时间。

问题:输入只是一个巨大的 gzip 压缩文件。我需要将其拆分成多个小文件来触发多个worker吗?还有其他方法可以加快导入速度吗?我错过了什么吗num_workers设置?谢谢!


我对apache beam不熟悉,答案是从一般流程的角度来看。

假设各个输入文件部分中的实体数据之间没有依赖关系,那么是的,使用多个输入文件肯定会有所帮助,因为所有这些文件都可以虚拟地并行处理(当然,取决于可用的最大数量)工人)。

You might不需要预先分割巨大的zip文件,如果与实际数据段处理相比,这种切换本身的开销可以忽略不计,则可以简单地将单个输入数据流的各个段移交给单独的数据段工作人员进行写入。

总体性能限制是读取输入数据、将其分割成段并移交给段数据工作人员的速度。

数据段工作器将其接收的数据段进一步分割成更小的块,最多相当于最多 500 个实体,这些实体可以在单个批处理操作中转换为实体并写入数据存储。根据所使用的数据存储客户端库,可能可以异步执行此操作,从而允许继续拆分为块并转换为实体,而无需等待先前的数据存储写入完成。

数据段工作器的性能限制将是数据段被分割成块以及块转换为实体的速度

如果异步操作不可用或无法获得更高的吞吐量,则可以将每个块再次移交给段工作程序,由段工作程序执行到实体的转换和数据存储批量写入。

数据段工作器级别的性能限制将只是数据段被分割成块并移交给块工作器的速度。

通过这种方法,对实体的实际转换以及将它们批量写入数据存储(异步或非异步)将不再位于分割输入数据流的关键路径中,我相信这是当前方法中的性能限制。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用多个工作人员加速批量导入谷歌云数据存储? 的相关文章

随机推荐