应该如何实现位于以下位置的以下逻辑:https://beam.apache.org/documentation/pipelines/design-your-pipeline/ https://beam.apache.org/documentation/pipelines/design-your-pipeline/:
//merge the two PCollections with Flatten//me
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);
由此可以将多个 PCollection 组合成一个 PCollection
在 apache beam python api 中?
您可以使用Flatten https://beam.apache.org/documentation/programming-guide/#core-beam-transforms也变身。例如:
data1 = ['one', 'two', 'three']
data2 = ['four','five']
input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)
merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())
合并的 PCollection 将包含:
INFO:root:one
INFO:root:two
INFO:root:three
INFO:root:four
INFO:root:five
完整代码:
import argparse, logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class LogFn(beam.DoFn):
"""Prints information"""
def process(self, element):
logging.info(element)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
data1 = ['one', 'two', 'three']
data2 = ['four','five']
input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)
merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())
merged | 'Check Results' >> beam.ParDo(LogFn())
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)