避免重新计算 Beam Python SDK 中所有云存储文件的大小

2024-01-24

我正在开发一个从 Google Cloud Storage (GCS) 目录读取约 500 万个文件的管道。我已将其配置为在 Google Cloud Dataflow 上运行。

问题是,当我启动管道时,需要几个小时“计算所有文件的大小”:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]

正如您所看到的,计算大约 550 万个文件的大小花了一个半小时(5549 秒),然后又从头开始!又花了2个小时跑了第二遍,然后又开始了第三遍!截至撰写本文时,该作业在 Dataflow 控制台中仍然不可用,这使我相信这一切都发生在我的本地计算机上,并且没有利用任何分布式计算。

当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.

按照这个速度,仅对所有 550 万个文件执行 GCS 大小估计 4 次就需要大约 8 小时,所有这些都是在 Dataflow 作业开始之前进行的。

我的管道配置为--runner=DataflowRunner选项,因此它应该在数据流中运行:

python bigquery_import.py --runner=DataflowRunner #other options...

管道从 GCS 读取数据如下:

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input',
    required=True,
    help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

with beam.Pipeline(options=pipeline_options) as p:
    files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')

参考bigquery_import.py https://github.com/rviscomi/bigquery/blob/8ac58f72a2367305d080e406e81ef376db8a90f7/dataflow/python/bigquery_import.py#L208-L212在 GitHub 上获取完整代码。

我很困惑为什么这个繁琐的过程发生在数据流环境之外以及为什么需要多次完成。我是否正确地从 GCS 读取文件,或者是否有更有效的方法?


感谢您报告此事。 Beam 有两种用于读取文本的转换。ReadFromText and ReadAllFromText. ReadFromText会遇到这个问题但是ReadAllFromText不应该。

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

缺点是ReadAllFromText是它不会执行动态工作重新平衡,但是在读取大量文件时这不应该成为问题。

Created https://issues.apache.org/jira/browse/BEAM-9620 https://issues.apache.org/jira/browse/BEAM-9620用于跟踪 ReadFromText(以及一般基于文件的源)的问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

避免重新计算 Beam Python SDK 中所有云存储文件的大小 的相关文章

随机推荐

  • Define 是未定义的 Javascript 节点

    我正在尝试使用 Node 运行 Javascript 文件 以将博客更新发布到 Tumblr 到目前为止 在我的 main js 文件中 我有以下内容 Tumblr Information var tumblr require vendor
  • 由于图形 API 不再可用,有什么方法可以通过 API 创建 facebook 事件

    我想使用 API 创建 Facebook 事件 我了解到 Graph API 不再可用于发布事件 我还尝试了 Javascript SDK 和 PHP SDK 但收到相同的错误消息 有没有办法通过 Javascript API 创建 Fac
  • 将嵌套 XML 绑定到 CheckboxList

    我有一个嵌套的xml包含菜单和子菜单的文件 我需要在页面加载时将其绑定到 asp net CheckBox 控件 我正在尝试使用下面的 C 代码进行绑定 C DataSet ds new DataSet try Reading the da
  • 当设备方向改变时,UIWebview 中出现黑条

    我有一个 UIWebView 正在加载到另一个视图上 旋转时 纵向或横向的一切看起来都很好 但是当我处于纵向时 当我从纵向旋转到横向时 我通过捏或双击稍微放大 视图不会完全填充使用 uiwebview 右侧大约有 10 个像素变黑 如该屏幕
  • 使用 JavaScript 通过 websocket 进行视频流传输

    最快的直播方式是什么live使用 JavaScript 制作视频 TCP 上的 WebSockets 是否是足够快的协议来传输 30fps 的视频 TCP 上的 WebSockets 是否是足够快的协议来传输 30fps 的视频 是的 是的
  • TYPO3 扩展生成器多个图像上传不起作用

    我的目标是使用 TYPO3 7 6 2 版本中的扩展生成器创建扩展 我从扩展构建器文档创建了类别产品扩展 除了上传单个图像之外 它工作得很好 但我必须创建将多个图像添加到单个产品并在前端显示图像轮播的功能 但扩展生成器不适用于文件上传 我是
  • 组合连续原子变量的存储/加载

    参考 稍微过时的 paper http www open std org JTC1 SC22 WG21 docs papers 2007 n2338 html作者 Hans Boehm 在 原子操作 下 它提到内存模型 当时提出 不会阻止优
  • 使用 Swift 强制 NSLocalizedString 使用特定语言

    通过 swift 我如何强制我的应用程序从特定的 Localized strings 读取数据 我在实例化 ViewController 之前将其放入 didFinishLaunchingWithOptions 中 但它仍然以英语显示应用程
  • 斯威夫特3;范围“超出范围”

    我刚刚将 Xcode 更新到 8 0 beta 2 和 swift 3 0 从 swift 2 3 更新后 我遇到了很多错误 我有一个字符串扩展 它将 self 字符串中的范围转换为 NSRange extension String fun
  • 为什么我应该用 c++ 而不是 c 设置插件接口

    由于我的previous https stackoverflow com questions 1054697 why isnt my new operator called 问题 https stackoverflow com questi
  • C# 将图像从 PowerPoint 复制到 Word

    我需要一个应用程序将文本和图像从 PowerPoint 复制到 Word 我使用这个库 Microsoft Office Interop PowerPoint 和 Microsoft Office Interop Word 文本很容易传输
  • Android 不同屏幕尺寸的布局

    我正在为 Android 应用程序的布局而苦苦挣扎 我为不同的屏幕尺寸定义了不同的布局 当前的布局目录结构是这样的 layout 布局土地 小布局 布局 xlarge 布局 xlarge 土地 Problem 主要布局目录文件正在显示3 7
  • 联系表格 Laravel 4

    我是 Laravel 4 的菜鸟 联系表单给我带来了一些麻烦 发现了一些东西 全部都使用控制器 但我只需要在路线中使用它 如何创建简单的联系表单 姓名 电子邮件和消息 的路由以将数据发送到管理员电子邮箱 Cheers 这是一种仅使用您的路由
  • 使用 Google 脚本删除电子表格中的空白行

    Spreadsheet 1 Spreadsheet 1 中存在的数据 Name apple android windows linux Germany 3 4 6 7 America 4 1 6 2 Sweden 1 6 1 6 Paris
  • 在C中将字符数字转换为相应的整数

    C语言中有没有办法将字符转换为整数 例如 从 5 to 5 根据其他回复 这很好 char c 5 int x c 0 另外 为了进行错误检查 您可能希望首先检查 isdigit c 是否为 true 请注意 您不能完全便携地对字母执行相同
  • 迭代除 x item 之外的字典

    我有一个这种格式的字典 d data key 1 value 1 key 2 value 2 key 3 value 3 key x value x key n value n 我必须迭代它的项目 for key value in colu
  • 如何区分 Switch,Checkbox 值是由用户更改还是以编程方式(包括通过保留)更改?

    setOnCheckedChangeListener new OnCheckedChangeListener Override public void onCheckedChanged CompoundButton buttonView b
  • 在knockout js中将循环结构转换为JSON

    我有两个网格结构 在其中一个网格结构中我多次有多个字段 而在其中一个网格结构中我一次有两个字段 我为每个网格编写 apply 方法 我的第一个网格 id 工作正常 但是当我单击第二个网格上的 应用 时 我收到此错误 Uncaught Typ
  • 在 C++ 软件中纳入共享软件限制

    我希望在共享软件的基础上实现我的软件 以便用户 给予最多 例如 30 天的试用期来试用该软件 购买时 我打算向用户提供一个随机生成的密钥 输入该密钥时 再次启用该软件 我以前从未走过这条路 所以任何建议 反馈或关于如何完成此操作的 标准 方
  • 避免重新计算 Beam Python SDK 中所有云存储文件的大小

    我正在开发一个从 Google Cloud Storage GCS 目录读取约 500 万个文件的管道 我已将其配置为在 Google Cloud Dataflow 上运行 问题是 当我启动管道时 需要几个小时 计算所有文件的大小 INFO