数据流:将 Top 模块与 Python SDK 结合使用:单元素 PCollection

2024-04-14

我正在查看 incubator-beam 存储库上的 word_counting.py 示例(从数据流文档链接),我想修改它以获得n 出现次数最多的。这是我的管道:

  counts = (lines
        | 'split' >> (beam.ParDo(WordExtractingDoFn())
                      .with_output_types(unicode))
        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
        | 'group' >> beam.GroupByKey()
        | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

我使用 Top.Of() 方法添加了一行,但它似乎返回一个将数组作为单个元素的 PCollection(我正在等待有序的 PCollection,但查看文档,PCollection 似乎是无序的集合。

当管道运行时,beam.Map 仅循环一个元素(即整个数组)并且在“format”中,lambda 函数会引发错误,因为它无法将整个数组映射到元组 (word,c)

我应该如何处理这个单元素 PCollection 而不会在这一步中断管道?


如果你想扩展一个PCollection的可迭代对象变成PCollection这些可迭代的元素,您可以使用FlatMap,其参数是从元素到结果可迭代的函数:在您的情况下,元素本身就是可迭代的,因此我们使用恒等函数。

  counts = ...
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
        | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  ...
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

数据流:将 Top 模块与 Python SDK 结合使用:单元素 PCollection 的相关文章

随机推荐

  • 一个月中的一周熊猫

    我试图每个月坚持一周 有些月份可能有四个星期 有些可能有五个星期 对于每个日期 我想知道它属于哪一周 我最感兴趣的是本月的最后一周 data pd DataFrame pd date range 1 1 2000 periods 100 f
  • 如何使用反射更改 kotlin 私有 val?

    我可以使用反射访问私有 val 值 如下所示 fun main val mainClass MainClass val f MainClass class memberProperties find it name info f let i
  • 一瞥让一切都慢了 50 倍

    我一直在使用glimpse来尝试解决一些页面速度慢的问题 结果发现glipse就是原因 页面请求超过 30000 秒 毫不夸张地说它们是即时的 所以我一直在追鬼 当导致如此速度差异时 我如何使用一瞥来查看一切需要多长时间 我是否配置错误或者
  • 如何在 iPad 硬件中(而不是在模拟器中)测试 iPad 应用程序

    在 iPad 模拟器上完成构建和测试后 我需要在 iPad 硬件上测试该应用程序 我怎样才能做到这一点 如果您已支付开发人员密钥的费用 则应该能够打开管理器窗口 设置您的设备 然后选择设备而不是模拟器作为 XCode 中的目标 看苹果的文档
  • 更高效的 matplotlib 堆积条形图 - 如何计算底部值

    我需要一些帮助 使用 matlibplot 在 python 中制作一组堆积条形图 我的基本代码如下 但我的问题是如何生成值bottom对于第二个之外的任何元素有效率的 我可以让示例图正确堆叠 始终从下到上为 a b c d import
  • 我应该在 OBDII 的 BLE IOS 设备中使用什么 BLE 特性

    您好 我想知道我应该从这个 OBDII BLE 设备 加密狗中使用什么写入和通知特性 我想在 Flutter 中创建一个适用于 IOS 的程序 有不少 Device name VEEPEAK Device id 34E2B2AF 60F4
  • 更改值结转次数的 maxgap

    我有一个类似于以下内容的数据框 library data table test lt data table data frame value c 5 NA 8 NA NA 8 6 NA NA 10 locf N c 1 NA 1 NA NA
  • google.script.run.withSuccessHandler() 返回未定义

    我使用下面提供的代码在单独的 GS 文件中创建了一个数组 我尝试在 HTML 文件中调用它 我的目标是将数组的内容与参数进行比较email 但是 返回的值google script run withSuccessHandler is und
  • 来自浏览器的带有正文的异步 GET 请求

    好吧 我知道这是一个坏主意 不应该这样做 但为了这个问题 请假设没有其他方法 我得到的 API 端点需要以空对象作为主体的 GET 请求 有没有办法从浏览器执行异步请求 我在用着axios使用的库XMLHttpRequest在引擎盖下和MD
  • 如何在Qt中暂时断开与插槽的信号?

    我用信号连接一个插槽 但现在我想暂时断开它们的连接 这是我的班级声明的一部分 class frmMain public QWidget private QTimer myReadTimer private slots void on btn
  • POST 请求(Javascript)

    如何在 Javascript 中发出简单的 POST 请求而不使用表单且不回发 虽然我从 sundeep 答案中获取代码示例 但为了完整性而将代码发布在此处 var url sample url php var params lorem i
  • 如何在 Django 1.8 中使用 jinja2 作为模板引擎

    我一直在研究如何在 django 1 8 中使用 jinja2 但是没有将 django 与 jinja2 一起使用的完整源代码 我想知道你们是否知道在 django 中使用 jinja2 的过程 我查看了官方文档并查看了以下问题 如何设置
  • 按 Option 键隐藏/显示应用程序主菜单中的菜单项

    我想在应用程序的主菜单中添加一个很少使用的菜单项 我希望它默认隐藏 仅当用户按住 Option 键时才显示 我该怎么做呢 看来我应该处理flagsChanged 但它是NSResponder的方法和NSMenu不继承自NSResponder
  • 为什么使用 boost 后 C++ 比 python 快得多?

    我的目标是用 Python 编写一个用于频谱有限元的小型库 为此我尝试使用 Boost 通过 C 库扩展 Python 希望它能让我的代码更快 class Quad public Quad int int double integrate
  • 将 TDD 与 Web 应用程序开发集成的最佳实践? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 单元测试和 ASP NET Web 应用程序在我的团队中是一个模棱两可的点 通常情况下 良好的测试实践会被忽视 Web 应用程序最终会在没有测试
  • 如何对库进行临时签名?

    尝试运行链接到动态库的可执行文件 出现以下错误 Library not loaded Reason tried
  • 为什么小于不起作用?

    这看起来很简单 但为什么这种比较不起作用呢 if nmax lt num nmax num 我把它放在一个循环中 寻找最大的数字 第一个数字是105 然后是89 然后是99 然后是一大堆大于99的数字 第一个数字是要测试的数字 第二个数字是
  • GWT:对RichTextArea进行文本限制并阻止用户输入更多字符

    我正在使用 GWT RixhText Area 并希望在 richText Area 中限制 100 个字符 现在我正在做这个 description addKeyDownHandler new KeyDownHandler Overrid
  • Elastic Beanstalk 剥离 Sec-WebSocket-Accept 标头

    我正在尝试让 NET Core 应用程序在 elastic beanstalk 上运行 以从浏览器中的 javascript 接收 websockets 连接 当我在本地计算机上测试 AWS 之外的客户端和服务器时 我能够在两者之间建立 W
  • 数据流:将 Top 模块与 Python SDK 结合使用:单元素 PCollection

    我正在查看 incubator beam 存储库上的 word counting py 示例 从数据流文档链接 我想修改它以获得n 出现次数最多的 这是我的管道 counts lines split gt gt beam ParDo Wor