我正在尝试以 JSON 格式将数据帧写入 s3 位置。但是每当执行器任务失败并且 Spark 重试该阶段时,它就会抛出FileAlreadyExistsException
.
A 类似的问题 https://stackoverflow.com/questions/57471781/spark-filealreadyexistsexception-on-stage-failure之前已被问过,但它使用单独的 Spark conf 解决了 ORC 文件,并且没有解决我的问题。
这是我的代码:
val result = spark.sql(query_that_OOMs_executor)
result.write.mode(SaveMode.Overwrite).json(s3_path)
从 Spark UI 中,执行器上的错误显示
ExecutorLostFailure (executor 302 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 4.5 GB of 4.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
但驱动程序堆栈跟踪显示
Job aborted due to stage failure: Task 1344 in stage 2.0 failed 4 times, most recent failure: Lost task 1344.3 in stage 2.0 (TID 25797, executor.ec2.com, executor 217): org.apache.hadoop.fs.FileAlreadyExistsException: s3://prod-bucket/application_1590774027047/-650323473_1594243391573/part-01344-dc971661-93ef-4abc-8380-c000.json already exists
我该如何让 Spark 尝试覆盖这个 JSON 文件?这样,一旦 4 次重试都失败,我就能得到驱动程序的真正原因。我已经将模式设置为覆盖,所以这没有帮助。
发生此问题是因为存在根本问题DirectFileOutputCommitter
默认情况下此处使用的。
这里有两件事:执行者OOM,然后FileAlreadyExistsException
重试时会导致重试(以及 SQL 查询)失败。
Reason:
The DirectFileOutputCommitter
将尝试在单个任务尝试中将输出文件写入最终输出路径。它将通过写入暂存目录,然后重命名为最终路径并删除原始路径来做到这一点。这很糟糕,容易出现不一致和错误,Spark 也不推荐这样做。
相反,我用了Netflix S3 提交者 https://github.com/rdblue/s3committer这将以多部分的方式做到这一点。它将首先在本地磁盘上写入文件,然后在任务提交期间,每个文件都会分部分上传到 S3,但不会立即可见,然后在作业提交期间(只有当所有任务完成时才会发生)成功,这是一个安全的操作)本地磁盘数据将被删除,上传将完成(现在数据将在 S3 上可见)。这可以防止失败的任务直接将内容写入 S3,从而避免FileAlreadyExistsException
重试时。
现在对于执行程序 OOM — 我的查询仍然会发生这种情况,但重试成功,之前也失败了DirectFileOutputCommitter
.
为了解决这个问题,我基本上做了
set spark.sql.sources.outputCommitterClass=com.netflix.s3.S3DirectoryOutputCommitter;
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)