我正在查看 incubator-beam 存储库上的 word_counting.py 示例(从数据流文档链接),我想修改它以获得n 出现次数最多的。这是我的管道:
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line
output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
我使用 Top.Of() 方法添加了一行,但它似乎返回一个将数组作为单个元素的 PCollection(我正在等待有序的 PCollection,但查看文档,PCollection 似乎是无序的集合。
当管道运行时,beam.Map 仅循环一个元素(即整个数组)并且在“format”中,lambda 函数会引发错误,因为它无法将整个数组映射到元组 (word,c)
我应该如何处理这个单元素 PCollection 而不会在这一步中断管道?
如果你想扩展一个PCollection
的可迭代对象变成PCollection
这些可迭代的元素,您可以使用FlatMap
,其参数是从元素到结果可迭代的函数:在您的情况下,元素本身就是可迭代的,因此我们使用恒等函数。
counts = ...
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
| 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!
output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
...
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)