我在跑步spark job
在有 2 个工作节点的集群中!我使用下面的代码(spark java)将计算的数据帧作为 csv 保存到工作节点。
dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath);
我试图了解 Spark 如何在每个工作节点上写入多个部分文件。
Run1) worker1
has part files
and SUCCESS
; worker2
has _temporarty/task*/part*
每个任务都有运行的部分文件。
Run2) worker1
有零件文件,还有_temporary
目录;worker2
has multiple part files
谁能帮助我理解为什么会出现这种行为?
1)我应该考虑中的记录吗?outputDir/_temporary
作为输出文件的一部分以及part files in outputDir
?
2)Is _temporary
作业运行后应该删除目录并移动part
文件到outputDir
?
3)为什么我不能直接在输出目录下创建零件文件?
coalesce(1)
and repartition(1)
不能是该选项,因为 outputDir 文件本身将在500GB
Spark 2.0.2. 2.1.3
and Java 8, no HDFS
经过分析,观察到我的 Spark 工作正在使用fileoutputcommitter version 1
这是默认的。
然后我添加了要使用的配置fileoutputcommitter version 2
代替version 1
并在 AWS 中的 10 节点 Spark 独立集群中进行了测试。全部part-* files
直接在下面生成outputDirPath
中指定的dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)
我们可以设置属性
通过包含相同的内容--conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2'
in spark-submit command
或使用sparkContext设置属性javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")
我了解失败情况下的后果,如火花文档 https://spark.apache.org/docs/latest/configuration.html,但我达到了预期的结果!
Spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version,默认值为1
这
文件输出committer算法版本,有效算法版本
number:1或2。版本2可能有更好的性能,但版本1
在某些情况下可以更好地处理故障,根据
MAPREDUCE-4815。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)