我有多个作业想要并行执行,这些作业使用分区将每日数据附加到同一路径中。
e.g.
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
作业 1 - 类别 =“billing_events”
作业 2 - 类别 =“click_events”
这两个作业都会在执行之前截断 s3 存储桶中存在的任何现有分区,然后将生成的 parquet 文件保存到各自的分区。
i.e.
作业 1 -> s3://bucket/save/path/eventDate=20160101/channel=billing_events
作业 2 -> s3://bucket/save/path/eventDate=20160101/channel=click_events
我面临的问题是 Spark 在作业执行期间创建的临时文件。它将工作文件保存到基本路径
s3://bucket/save/path/_temporary/...
因此,这两个作业最终共享相同的临时文件夹并导致冲突,我注意到这可能会导致一个作业删除临时文件,而另一个作业失败,并显示来自 s3 的 404 消息,表示预期的临时文件不存在。
有没有人遇到过这个问题并提出了在同一基本路径中并行执行作业的策略?
我现在使用spark 1.6.0
因此,在阅读了很多关于如何解决这个问题的文章后,我想我应该将一些智慧传回这里来总结一下。主要感谢塔尔的评论。
我还发现直接写入 s3://bucket/save/path 似乎很危险,因为如果一个作业被终止并且临时文件夹的清理在作业结束时没有发生,那么它似乎就留在那里了下一个工作,我注意到有时之前被杀死的工作临时文件会落在 s3://bucket/save/path 中并导致重复...完全不可靠...
此外,将 _temporary 文件夹文件重命名为其相应的 s3 文件需要花费大量时间(每个文件大约 1 秒),因为 S3 仅支持复制/删除而不支持重命名。此外,只有驱动程序实例使用单个线程重命名这些文件,因此具有大量文件/分区的某些作业中多达 1/5 的时间都花在等待重命名操作上。
由于多种原因,我排除了使用 DirectOutputCommitter。
- 当与推测模式结合使用时,会导致重复(https://issues.apache.org/jira/browse/SPARK-9899 https://issues.apache.org/jira/browse/SPARK-9899)
- 任务失败会留下混乱,以后无法找到并删除/清理。
- Spark 2.0 已完全删除对此的支持,并且不存在升级路径。(https://issues.apache.org/jira/browse/SPARK-10063 https://issues.apache.org/jira/browse/SPARK-10063)
执行这些作业的唯一安全、高性能且一致的方法是首先将它们保存到 hdfs 中唯一的临时文件夹(通过 applicationId 或时间戳唯一)。并在作业完成时复制到 S3。
这允许并发作业执行,因为它们将保存到唯一的临时文件夹,无需使用 DirectOutputCommitter,因为 HDFS 上的重命名操作比 S3 更快,并且保存的数据更加一致。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)