如何在 python apache beam 中展平多个 Pcollection

2024-04-21

应该如何实现位于以下位置的以下逻辑:https://beam.apache.org/documentation/pipelines/design-your-pipeline/ https://beam.apache.org/documentation/pipelines/design-your-pipeline/:

//merge the two PCollections with Flatten//me 
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

由此可以将多个 PCollection 组合成一个 PCollection 在 apache beam python api 中?


您可以使用Flatten https://beam.apache.org/documentation/programming-guide/#core-beam-transforms也变身。例如:

data1 = ['one', 'two', 'three']
data2 = ['four','five']

input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)

merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

合并的 PCollection 将包含:

INFO:root:one
INFO:root:two
INFO:root:three
INFO:root:four
INFO:root:five

完整代码:

import argparse, logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class LogFn(beam.DoFn):
  """Prints information"""
  def process(self, element):
    logging.info(element)
    return element


def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  p = beam.Pipeline(options=pipeline_options)

  data1 = ['one', 'two', 'three']
  data2 = ['four','five']

  input1 = p | 'Create PCollection1' >> beam.Create(data1)
  input2 = p | 'Create PCollection2' >> beam.Create(data2)

  merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

  merged | 'Check Results' >> beam.ParDo(LogFn())

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 python apache beam 中展平多个 Pcollection 的相关文章

随机推荐

  • 如何使用同一模型的其他字段的值在 django 模型中创建字段?

    我想创建一个字段名称 total其中有所有产品的总价 数量 我想要的是我该怎么做total price quantity在 Django 模型中 正如您所看到的 可以有不止一种产品 我已通过 tabularinline 将 OderItem
  • Selenium Python 等待元素中出现文本错误显示需要 3 个参数 2 个给定

    我正在使用 WebdriverWait 等待网页上的元素中出现某些文本 我正在使用 Selenium 和 Python 我的语法不正确 我收到错误 类型错误 init 恰好需要 3 个参数 给定 2 个 错误跟踪 Traceback mos
  • 使用单个字符串查找多个文件路径

    我尝试编写一个批处理脚本来查找与输入字符串同名的文件的所有路径 现在它只能找到找到的第一个文件 我想不出一种方法让它列出多个文件位置 我经验不足 需要一些帮助 这是脚本代码的一部分 start cls echo Enter file nam
  • SQL-在一个字段中选择与另一字段中记录最高的不同记录

    在我有一个像这样的表的情况下 int id PK int staff id int skill id bit mainskill 我想为每位员工 由 Staff id 表示 仅选择一条记录 列出他们的主要技能 由 mainskill 中的
  • Visual Studio Code 中的 PHP 块快捷方式

    如何在 Visual Studio Code 中打开基本 PHP 块 如下所示 In Sublime Text https en wikipedia org wiki Sublime Text I simply type php and p
  • 时间格式说明(Google Directions API)

    我已阅读用于提出方向请求的 Google Directions API 文档 URL 的示例如下 http maps googleapis com maps api directions json origin Brooklyn desti
  • 在 Laravel 中使用 Socialite 登录后重定向到 URL

    我需要使用以下 URL 注册参加锦标赛 http laravel dev tournaments 1 register 该 URL 位于中间件 auth 中 因此如果用户未登录 他将被重定向到登录页面 我需要的是重定向到 http lara
  • 循环遍历多个 JObject 级别并将信息收集为字符串

    我使用以下代码从 URL 收集 Json 数据 var json new WebClient DownloadString http steamcommunity com id tryhardhusky inventory json 753
  • 使用 gdb 调试时彻底退出 valgrind

    我正在使用 valgrind 和 gdb 调试程序 然而 我以一种野蛮的方式终止了这些调试会话 这真的是它应该做的吗 设置调试会话 按照来自的指示valgrind 官方网站 http valgrind org docs manual man
  • 如何显示文件解压进度?

    我正在尝试找出一种方法来显示当前进度以及解压缩并将 zip 文件的内容写入磁盘的剩余时间 我目前正在使用此处找到的 ZipArchiver 类http code google com p ziparchive http code googl
  • Python pandas 插入长整型

    我正在尝试在 Pandas Dataframe 中插入长整数 import numpy as np from pandas import DataFrame data scores 6311132704823138710 273 26850
  • NEDB 文件存储在哪里?

    var Datastore require nedb db new Datastore filename testdb db autoload true var doc hello world n 5 today new Date nedb
  • 在 Google 电子表格上,如何称呼 IP 的城市、国家/地区?

    我想知道是否有一个公式 脚本可以在 Google 电子表格上使用来获取 IP 地址数组的城市 位置 也就是说 假设 A 列上的每个单元格都有 100 个 IP 地址 我应该在 B 列上使用什么公式 脚本来获取各自的城市和位置 最简单的方法是
  • Qt 调试器在 mac 上使用错误的 python 版本

    我使用的是 macOS Mojave 10 14 6 我的Qt版本是5 13 1 我的 Qt Creator 版本是 4 10 0 当我设置断点并运行应用程序时 调试器永远不会完成并打印到调试器日志并显示以下错误 因此 据我所知 lldb
  • iOS 复制和粘贴

    我正在创建一个应用程序 以便在我在 iOS 设备上复制某些内容时保存我复制的项目 无论如何 我是否可以创建一个事件 以便每当我从 iOS 设备上的任何应用程序复制某些内容时 它都会将其保存到我的应用程序中 我希望它在我复制文本时触发 以便将
  • 是否可以使用前导和跟踪来设置 Android 字体样式?

    android 字体样式中是否可以有以下内容 Leading http en wikipedia org wiki Leading 文本行之间的垂直空间 名称来自于机械印刷过程中用于分隔文本行的物理铅片 Tracking http en w
  • 什么是 Unicode、UTF-8 和 UTF-16?

    Unicode 的基础是什么 为什么需要 UTF 8 或 UTF 16 我在谷歌上研究过这个问题 也在这里搜索过 但我不清楚 In VSS https en wikipedia org wiki Microsoft Visual Sourc
  • cuda 共享内存 - 结果不一致

    我正在尝试并行缩减以对 CUDA 中的数组求和 目前我传递一个数组来存储每个块中元素的总和 这是我的代码 include
  • Log4J 仅将一个类附加到附加程序

    我需要定期轮询正在运行的应用程序的 JVM 内存统计信息 我正在运行一个服务来执行此操作并将统计信息写入根记录器 我对根记录器的使用与否没有太多控制权 我想要做的是将这些日志消息路由到单个附加程序 该附加程序应该只处理来自该类的日志消息 而
  • 如何在 python apache beam 中展平多个 Pcollection

    应该如何实现位于以下位置的以下逻辑 https beam apache org documentation pipelines design your pipeline https beam apache org documentation