使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

2024-03-24

我正在尝试从 GCS 存储桶读取 XML 文件的集合并处理它们,其中集合中的每个元素都是代表整个文件的字符串,但我找不到关于如何完成此操作的合适示例,我也无法理解它来自 Apache Beam 文档,主要是关于 Java 版本的。

我当前的管道如下所示:

p = beam.Pipeline(options=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.Read(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()

我收到的错误消息是:

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)

File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals)  # execute the script

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 135, in <module>
run()

File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 130, in run
p.run().wait_until_finish()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 421, in wait_until_finish
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 398, in await_completion
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 444, in await_completion
six.reraise(t, v, tb)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 341, in call
finish_state)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 366, in attempt_call
side_input_values)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 109, in get_evaluator
input_committed_bundle, side_inputs)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 283, in __init__
self._source.pipeline_options = evaluation_context.pipeline_options
AttributeError: 'str' object has no attribute 'pipeline_options'

非常感谢任何帮助。 谢谢 托默

解决了第一个问题:事实证明这不适用于 DirectRunner,将运行器更改为 DataFlowRunner 并替换Read with 从文本读取解决了异常:

p = 束.Pipeline(选项=PipelineOptions(pipeline_args))

(p
 | 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
 | 'String To BigQuery Row' >> beam.Map(lambda s:
                                        data_ingestion.parse_method(s))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish() 

但现在我看到这种方法为我提供了每个文件中的一行作为管道元素,而我希望将整个文件作为字符串作为每个元素。 不知道该怎么做。我发现这个帖子 https://stackoverflow.com/questions/45920895/read-a-file-from-gcs-in-apache-beam/45946643#45946643但它是用java编写的,不确定它如何与python和gcs版本一起工作。

所以看起来 ReadFromText 不适用于我的用例,而且我不知道如何创建文件管道。

解决方案: 感谢 Ankur 的帮助,我修改了代码以包含从 MatchResult 对象列表转换所需的步骤,这是 GCSFileSystem 返回的字符串 pCollection,每个字符串代表一个文件。

p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)

(p
 | 'Read Files' >> beam.Create([m.metadata_list for m in gcs.match([training_files_folder])])
 | 'metadata_list to filepath' >> beam.FlatMap(lambda metadata_list: [metadata.path for metadata in metadata_list])
 | 'string To BigQuery Row' >> beam.Map(lambda filepath:
                                        data_ingestion.parse_method(gcs_reader.get_string_from_filepath(filepath)))
 | 'Write to BigQuery' >> beam.io.Write(
            beam.io.BigQuerySink(
                known_args.output,
                schema='title:STRING,text:STRING,id:STRING',
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                # Appends data to the BigQuery table
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
p.run().wait_until_finish()

该代码使用此帮助程序类来读取 gcs 文件:

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

  def get_string_from_filepath(self,filepath):
      with self.gcs.open(filepath) as reader:
          res = reader.read()

      return res

ReadFromText 逐行读取给定路径中的文件。 您想要的是文件列表,然后使用 GcsFileSystem 在 ParDo 中一次读取一个文件https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsfilesystem.py https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/gcsfilesystem.py然后将内容写入 BigQuery。

您还可以参考类似主题的邮件主题https://lists.apache.org/thread.html/85da22a845cef8edd942fcc4906a7b47040a4ae8e10aef4ef00be233@%3Cuser.beam.apache.org%3E https://lists.apache.org/thread.html/85da22a845cef8edd942fcc4906a7b47040a4ae8e10aef4ef00be233@%3Cuser.beam.apache.org%3E

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Google Cloud DataFlow python sdk 读取一组 xml 文件 的相关文章

  • 根据随机选择的列生成随机天数

    我有一个如下所示的数据框 感谢 SO 社区在以下方面提供的帮助 df1 pd DataFrame person id 11 11 12 13 14 date birth 01 01 1961 12 30 1961 05 29 1967 01
  • 如何在groupby之后将pandas数据框拆分为许多列

    我希望能够在 pandas 中使用 groupby 按列对数据进行分组 然后将其拆分 以便每个组都是数据框中自己的列 e g time data 0 1 2 0 1 2 3 0 2 3 4 0 3 1 2 1 4 2 3 1 5 3 4 1
  • multiprocessing.freeze_support()

    为什么多处理模块需要调用特定的function http docs python org dev library multiprocessing html multiprocessing freeze support在被 冻结 以生成 Wi
  • 远程控制或脚本打开 Office 从 Python 编辑 Word 文档

    我想 最好在 Windows 上 在特定文档上启动 Open Office 搜索固定字符串并将其替换为我的程序选择的另一个字符串 我该如何从外部 Python 程序中做到这一点 OLE 什么 原生 Python 脚本解决方案 The doc
  • 使用 Python 中的 IAM 角色访问 AWS API Gateway

    我有一个 AWS API 网关 我想使用它来保护其安全IAM 角色 http docs aws amazon com apigateway latest developerguide permissions html 我正在寻找一个包来帮助
  • 使用 Python 解析 XML,解析外部 ENTITY 引用

    在我的 S1000D xml 中 它指定了一个带有对公共 URL 的引用的 DOCTYPE 该 URL 包含对包含所有有效字符实体的许多其他文件的引用 我使用 xml etree ElementTree 和 lxml 尝试解析它并得到解析错
  • 使用reduce方法的斐波那契数列

    于是 我看到有人用reduce方法来计算斐波那契数列 这是他的想法 1 0 1 1 2 1 3 2 5 3 对应于 1 1 2 3 5 8 13 21 代码如下所示 def fib reduce n initial 1 0 dummy ra
  • 从 python 中的缩进文本文件创建树/深度嵌套字典

    基本上 我想迭代一个文件并将每行的内容放入一个深层嵌套的字典中 其结构由每行开头的空格数量定义 本质上 目标是采取这样的事情 a b c d e 并将其变成这样的东西 a b c d e Or this apple colours red
  • 如何在python中递归复制目录并覆盖全部?

    我正在尝试复制 home myUser dir1 及其所有内容 及其内容等 home myuser dir2 在Python中 此外 我希望副本覆盖中的所有内容dir2 It looks like distutils dir util co
  • Docker 日志中的 Python 异常标记为流:stdout

    我想解析和处理来自 docker 容器的所有错误 但当我期望 stderr 时 Python 异常标记为 stdout 举个简单的例子app py raise Exception 然后我在 docker 容器中运行这个文件 但在 var l
  • python 中的基本矩阵转置

    我尝试了 python 中矩阵转置的最基本方法 但是 我没有得到所需的结果 接下来是代码 A 1 1 1 1 2 2 2 2 3 3 3 3 4 4 4 4 print A def TS A B A for i in range len A
  • 使用 pandas 绘制带有误差线的条形图

    我正在尝试从 DataFrame 生成条形图 如下所示 Pre Post Measure1 0 4 1 9 这些值是我从其他地方计算出来的中值 我还有它们的方差和标准差 以及标准误差 我想将结果绘制为具有适当误差线的条形图 但指定多个误差值
  • Scikit Learn - K-Means - 肘部 - 标准

    今天我想学习一些关于 K means 的知识 我已经了解该算法并且知道它是如何工作的 现在我正在寻找正确的 k 我发现肘部准则作为检测正确的 k 的方法 但我不明白如何将它与 scikit learn 一起使用 在 scikit learn
  • 向伪 shell (pty) 发出命令

    我尝试使用 subprocess popen os spawn 来运行进程 但似乎需要伪终端 import pty master slave pty openpty os write master ls l 应该发送 ls l 到从属终端
  • Windows 与 Linux 文本文件读取

    问题是 我最近从 Windows 切换到 Ubuntu 我的一些用于分析数据文件的 python 脚本给了我错误 我不确定如何正确解决 我当前仪器的数据文件输出如下 Header 有关仪器等的各种信息 Data 状态 代码 温度 字段等 0
  • Python“self”关键字[重复]

    这个问题在这里已经有答案了 我是 Python 新手 通常使用 C 最近几天开始使用它 在类中 是否需要在对该类的数据成员和方法的任何调用前添加前缀 因此 如果我在该类中调用方法或从该类获取值 我需要使用self method or sel
  • 字母尺度和随机文本上的马尔可夫链

    我想使用 txt 文件中的一本书中的字母频率生成随机文本 以便每个新字符 string lowercase 取决于前一个 如何使用马尔可夫链来做到这一点 或者使用每个字母都有条件频率的 27 个数组更简单 我想使用来自的字母频率生成随机文本
  • 使用Python重命名目录中的多个文件

    我正在尝试使用以下 Python 脚本重命名目录中的多个文件 import os path Users myName Desktop directory files os listdir path i 1 for file in files
  • 将数组从 .npy 文件读入 Fortran 90

    我使用 Python 以二维数组 例如 X 的形式生成一些初始数据 然后使用 Fortran 对它们进行一些计算 最初 当数组大小约为 10 000 x 10 000 时 np savetxt 在速度方面表现良好 但是一旦我开始增加数组的维
  • 如何抑制 Pandas Future 警告?

    当我运行该程序时 Pandas 每次都会给出如下所示的 未来警告 D Python lib site packages pandas core frame py 3581 FutureWarning rename with inplace

随机推荐

  • 在 Mac 上解压缩大型 ZIP 档案(例如最新的 Microsoft Edge VM)

    尝试在我的 Mac 上解压最新的 Microsoft Edge Vm zip 文件后 我最终得到一个 cpgz 文件 它应该为 virtualBox 生成一个 ova 文件 有人知道怎么修这个东西吗 http mattormeeple co
  • 何时使用 handler.post() 以及何时使用 new Thread()

    我想知道什么时候应该使用handler post runnable 我什么时候应该使用new Thread runnable start Handler 的开发人员文档中提到了这一点 导致 Runnable r 添加到消息队列中 可运行的
  • MS Word 在 python 中的读/写、Python-docx 问题和 win32com 参考?

    最近 我正在尝试使用不同的 API 进行 MS Word 文件管理 现在正在编写 此时我只需要一个简单的编写Python API 我尝试了 win32com 模块 事实证明该模块非常强大 但缺乏 python 在线示例 对 VB 和 C 的
  • nginx 位置正则表达式 - 字符类和匹配范围

    我正在尝试为路径设置正则表达式 s lt 4 6 character string here gt 我将 4 6 个字符串捕获为 1 我尝试使用以下两个条目 但都失败了 location s 0 9a zA Z 4 6 location s
  • 不同Y级别的UIButton无法在tvOS上聚焦

    我开始为 Apple TV 开发应用程序 但在按钮方面遇到了这个问题 我有一个屏幕 屏幕底部有几个按钮 中间有一个按钮 请参阅屏幕截图以了解 问题是我无法集中中间按钮 焦点仅位于底部的三个按钮上 问题显然是由Y位置 当我移动屏幕底部的按钮时
  • 命令未存储在命令历史记录中

    背景 https stackoverflow com questions 76566358 这个答案 https stackoverflow com a 47595405 2153235依赖于 readline 模块并且write hist
  • 在 MVC razor 视图中的 foreach 循环中对单选按钮进行分组?

    我尝试通过在 html 帮助中提供额外的 html 属性来对循环内的一组单选按钮进行分组 如下所示 ol class Opt foreach var opt in quest Options li class Opt Html RadioB
  • 从 git 包中恢复所有引用

    如何从备份恢复所有远程分支 git bundle Backup git bundle create tmp dp all git bundle list heads tmp dp head n5 f37c9fc7f0ce121568f42f
  • 改进低效的 jQuery 选择器

    在 IntelliJ 中 如果我使用 jQuery 选择器 例如 roleField option each function impl omitted 选择器突出显示 并建议我应该 以 ID 选择器开头的拆分后代选择器 IntelliJ
  • 语音训练文件和注册表位置

    我有一个演讲项目 需要用代码完成声学训练 我能够使用 SAPI 在 Windows 7 下成功创建包含成绩单及其关联注册表项的培训文件 但是 我无法确定识别引擎是否成功使用这些文件并调整其模型 我的问题如下 通过控制面板训练 UI 执行训练
  • Cordova InAppBrowser 不会缩放加载的页面

    谁能帮我获取 Cordova InAppBrowser 在 Android 应用程序上加载的外部页面以适合手机屏幕尺寸 我使用以下代码从 Sencha Touch 应用程序调用 inappbrowser var opt location n
  • AngularJS 'scrollTop' 等效吗?

    我希望在 AngularJS 指令中实现类似的东西 https github com geniuscarrier scrollToTop blob master jquery scrollToTop js https github com
  • 不同 int 类型的运算

    我有一个使用多种不同 int 类型的程序 最常用的是uint64 t和标准int 但是我想知道我是否可以安全地进行它们之间的混合操作 例如我有一个uint64 t我想添加一个int并将该值存储为另一个值uint64 t 做这样的事情安全吗
  • Flask-sqlalchemy和oracle数据库id不自动递增

    我想使用 Python 和 Flask 框架以及 SQLAlchemy 模块在我的数据库 Oracle 11g 但 Ubuntu 16 04 的 Express 版本 中创建一个新表 表的第一个字段是 ID 是一个整数字段 我希望它自动增量
  • 加密/解密 C 语言的 Python 脚本

    重复项 我还没有找到答案 https stackoverflow com questions 4066361 how to obfuscate python code https stackoverflow com questions 40
  • MSBuild 目录结构限制解决方法

    有没有人有办法克服 MSBuild 工具的 260 个字符限制 用于从命令行构建 Visual Studio 项目和解决方案 我试图使用 CruiseControl 实现自动化构建 CruiseControl NET 不是一个选项 所以我试
  • MVC2 ASP.Net URL 是否自动编码?

    所以我像这样调用 MVC2 中的控制器 并且可以很好地访问经典查询字符串 请注意 第二个参数 thing2 已经是 URLEncoded 再次检索 URLEncoded 查询字符串和 URLDecoding 是没有问题的 我的例子看起来像这
  • 使用本地主机时 GuzzleHttp 挂起

    这是一个简单的代码片段 但这只是挂起并且没有响应 httpClient new GuzzleHttp Client version 6 x headers X API KEY gt 123456 request httpClient gt
  • VisualSVN Server:使用 svnadmin 工具进行备份

    我已经在 Windows Server 2008 上安装了 VisualSVN Server 我尝试使用 svnadmin 工具创建热复制备份 我好像没有安装这个工具 C gt svnadmin hotcopy C Repositories
  • 使用 Google Cloud DataFlow python sdk 读取一组 xml 文件

    我正在尝试从 GCS 存储桶读取 XML 文件的集合并处理它们 其中集合中的每个元素都是代表整个文件的字符串 但我找不到关于如何完成此操作的合适示例 我也无法理解它来自 Apache Beam 文档 主要是关于 Java 版本的 我当前的管