计算一次 GroupBy,然后将其传递给 Google DataFlow (Python SDK) 中的多个转换

2024-05-09

我正在使用适用于 Apache Beam 的 Python SDK 在 Google DataFlow 上运行特征提取管道。我需要运行多个转换,所有这些转换都希望项目按键分组。

基于这个答案question https://stackoverflow.com/questions/51203221/optimizing-repeated-transformations-in-apache-beam-dataflow,DataFlow 无法自动发现并重用像 GroupBy 这样的重复转换,因此我希望先运行 GroupBy,然后将结果 PCollection 提供给其他转换(请参阅下面的示例代码)。

我想知道这是否应该在 DataFlow 中有效地工作。如果没有,Python SDK 中推荐的解决方法是什么?是否有一种有效的方法可以让多个 Map 或 Write 转换获取同一 GroupBy 的结果?就我而言,我观察到 DataFlow 在利用率为 5% 时扩展到最大工作线程数,并且在 GroupBy 之后的步骤中没有取得任何进展,如此处所述question https://stackoverflow.com/questions/55401268/a-simple-counting-step-following-a-group-by-key-is-extremely-slow-in-a-dataflow.

示例代码。为简单起见,仅显示 2 个变换。

# Group by key once.
items_by_key = raw_items | GroupByKey()

# Write groupped items to a file.
(items_by_key | FlatMap(format_item) | WriteToText(path))

# Run another transformation over the same group.
features = (items_by_key | Map(extract_features))

单台喂料输出GroupByKey步骤进入多个转换应该可以正常工作。但是您可以获得的并行化程度取决于原始数据中可用键的总数GroupByKey步。如果任何一个下游步骤具有高扇出,请考虑添加一个改组 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L632在这些步骤之后将允许 Dataflow 进一步并行执行。

例如,

pipeline | Create([<list of globs>]) | ParDo(ExpandGlobDoFn()) | Reshuffle() | ParDo(MyreadDoFn()) | Reshuffle() | ParDo(MyProcessDoFn())

Here,

  • ExpandGlobDoFn:扩展输入全局并生成文件
  • MyReadDoFn:读取给定文件
  • MyProcessDoFn:处理从文件中读取的元素

我用了两个Reshuffle在这里(请注意Reshuffle has a GroupByKey其中)允许(1)并行读取给定 glob 中的文件(2)并行处理给定文件中的元素。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

计算一次 GroupBy,然后将其传递给 Google DataFlow (Python SDK) 中的多个转换 的相关文章

随机推荐