如何使用 Cloud Composer 将大数据从 Postgres 导出到 S3?

2024-04-24

我一直在使用 Postgres to S3 运算符将数据从 Postgres 加载到 S3。但最近,我必须导出一个非常大的表,并且我的 Airflow Composer 失败,没有任何日志,这可能是因为我们正在使用 Python 临时文件模块的 NamedTemporaryFile 函数来创建临时文件,并且我们正在使用这个临时文件加载到 S3 。由于我们使用的是 Composer,因此这将被加载到 Composer 的本地内存中,并且由于文件的大小非常大,因此会失败。

参考这里:https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs

我确实检查了 RedshiftToS3 运算符,因为它也使用 Postgres 钩子,并且它有几个可以轻松加载大文件的卸载选项,但我意识到 Redshift 和 Postgres 之间没有 1-1 对应关系。所以这是不可能的。有什么方法可以拆分我的 Postgres 查询吗?现在我正在做SELECT * FROM TABLENAME另外,我没有任何有关该表的信息。

我也遇到过这个类似的运算符:https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html

这里有一个参数approx_max_file_size_bytes:

该运算符支持将大型表转储拆分为 多个文件(请参阅上面文件名参数文档中的注释)。这 param 允许开发人员指定分割的文件大小。

我从代码中了解到的是,当大小超过给定限制时,他们正在创建一个新的临时文件,那么他们是否会将文件拆分为多个临时文件,然后分别上传?

编辑: 我将再次准确地解释我想要做什么。目前,Postgres 到 S3 操作符会创建一个临时文件,并将游标返回的所有结果写入该文件,这会导致内存问题。所以我的想法是,我可以添加 max_file_size 限制,对于游标中的每一行,我将把结果写入临时文件,如果临时文件的大小超过我们设置的 max_file_size 限制,我们将写入我们的内容文件到 S3,然后刷新或删除该文件,然后创建一个新的临时文件并将光标的下一行写入该文件,并将该文件也上传到 S3。我不知道如何像这样修改运算符?


正如您已经发现的那样,这是因为您正在用表中的每一行构建一个字典,当表中有很多行时,机器上的内存就会耗尽。

您已经真正回答了自己的问题:仅写入 a 直到文件达到一定大小,然后将文件推送到 S3。或者,您可以将文件保留在磁盘上,并每 x 行刷新字典对象,但在这种情况下,您的文件可能会在磁盘上而不是在内存中变得非常大。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Cloud Composer 将大数据从 Postgres 导出到 S3? 的相关文章

随机推荐