ParDo 中的侧面输出 | Apache Beam Python SDK

2024-04-05

由于该文档仅适用于 JAVA,我无法真正理解它的含义。

它指出 -“虽然 ParDo 始终生成一个主输出 PCollection(作为 apply 的返回值),但您也可以让 ParDo 生成任意数量的附加输出 PCollection。如果您选择有多个输出,您的 ParDo 将返回所有输出 PCollection(包括主输出)捆绑在一起。例如,在 Java 中,输出 PCollections 捆绑在类型安全的 PCollectionTuple 中。

我理解捆绑在一起的含义,但是如果我在 DoFn 中生成一个标签,它是否会生成一个所有其他输出为空的捆绑包,并在代码中遇到它们时生成其他输出?或者它等待所有的产量准备好输入,然后将它们全部打包在一起输出?

文档中对此没有太多说明。虽然我认为它不会等待,只是在遇到时屈服,但我仍然需要了解发生了什么。


回答这个问题的最好方法是举一个例子。这个例子是可用于光束 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py.

假设您要运行字数统计管道(例如,计算每个单词在文档中出现的次数)。为此,您需要将文件中的行分割成单独的单词。考虑到您还想单独计算单词长度。你的分割变换会像这样:

with beam.Pipeline(options=pipeline_options) as p:

    lines = p | ReadFromText(known_args.input)  # Read in the file

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    split_lines_result = (lines
                          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
                              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
                              main='words'))

    short_words = split_lines_result['words']
    character_count = split_lines_result[
        SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]

在这种情况下,每个都是不同的PCollection,具有正确的元素。这DoFn将负责分割其输出,并通过标记元素来实现。看:

class SplitLinesToWordsFn(beam.DoFn):
  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'

  def process(self, element):
    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
    # collection.
    yield pvalue.TaggedOutput(
        self.OUTPUT_TAG_CHARACTER_COUNT, len(element))

    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      # yield word to add it to the main collection.
      yield word

正如您所看到的,对于主输出,您不需要标记元素,但对于其他输出则需要标记元素。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ParDo 中的侧面输出 | Apache Beam Python SDK 的相关文章

随机推荐