我一直在使用 Postgres to S3 运算符将数据从 Postgres 加载到 S3。但最近,我必须导出一个非常大的表,并且我的 Airflow Composer 失败,没有任何日志,这可能是因为我们正在使用 Python 临时文件模块的 NamedTemporaryFile 函数来创建临时文件,并且我们正在使用这个临时文件加载到 S3 。由于我们使用的是 Composer,因此这将被加载到 Composer 的本地内存中,并且由于文件的大小非常大,因此会失败。
参考这里:https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs
我确实检查了 RedshiftToS3 运算符,因为它也使用 Postgres 钩子,并且它有几个可以轻松加载大文件的卸载选项,但我意识到 Redshift 和 Postgres 之间没有 1-1 对应关系。所以这是不可能的。有什么方法可以拆分我的 Postgres 查询吗?现在我正在做SELECT * FROM TABLENAME
另外,我没有任何有关该表的信息。
我也遇到过这个类似的运算符:https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html
这里有一个参数approx_max_file_size_bytes
:
该运算符支持将大型表转储拆分为
多个文件(请参阅上面文件名参数文档中的注释)。这
param 允许开发人员指定分割的文件大小。
我从代码中了解到的是,当大小超过给定限制时,他们正在创建一个新的临时文件,那么他们是否会将文件拆分为多个临时文件,然后分别上传?
编辑:
我将再次准确地解释我想要做什么。目前,Postgres 到 S3 操作符会创建一个临时文件,并将游标返回的所有结果写入该文件,这会导致内存问题。所以我的想法是,我可以添加 max_file_size 限制,对于游标中的每一行,我将把结果写入临时文件,如果临时文件的大小超过我们设置的 max_file_size 限制,我们将写入我们的内容文件到 S3,然后刷新或删除该文件,然后创建一个新的临时文件并将光标的下一行写入该文件,并将该文件也上传到 S3。我不知道如何像这样修改运算符?