在我的应用程序中,我正在读取 40 GB 的文本文件,这些文件完全分布在 188 个文件中。
我拆分此文件并使用 rdd 对在 Spark 中每行创建 xml 文件。
对于 40 GB 的输入,它将创建数百万个小 xml 文件,这是我的要求。
一切正常,但是当 Spark 将文件保存在 S3 中时,它会抛出错误并且作业失败。
这是我得到的例外
引起原因:java.nio.file.FileSystemException:
/mnt/s3/emrfs-2408623010549537848/0000000000:打开的文件太多
sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
在
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
在
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
在
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
在 java.nio.file.Files.newByteChannel(Files.java:361) 处
java.nio.file.Files.createFile(Files.java:632) 在
com.amazon.ws.emr.hadoop.fs.files.TemporaryFiles.create(TemporaryFiles.java:70)
在
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.openNewPart(MultipartUploadOutputStream.java:493)
... 21 更多
ApplicationMaster主机:10.97.57.198 ApplicationMaster RPC端口:0
队列:默认开始时间:1542344243252 最终状态:FAILED
跟踪网址:http://ip-10-97-57-234.tr-fr-nonprod.aws-int.thomsonreuters.com:20888/proxy/application_1542343091900_0001/ http://ip-10-97-57-234.tr-fr-nonprod.aws-int.thomsonreuters.com:20888/proxy/application_1542343091900_0001/用户:线程“main”中的hadoop异常
org.apache.spark.SparkException:应用程序
application_1542343091900_0001 已完成,状态为失败
还有这个
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
请降低您的请求率。 (服务:Amazon S3;状态代码:
503;错误代码:减速;请求 ID:D33581CA9A799F64; S3 扩展
请求ID:
/SlEplo+lCKQRVVH+zHiop0oh8q8WqwnNykK3Ga6/VM2HENl/eKizbd1rg4vZD1BZIpp8lk6zwA=),
S3 扩展请求 ID:
/SlEplo+lCKQRVVH+zHiop0oh8q8WqwnNykK3Ga6/VM2HENl/eKizbd1rg4vZD1BZIpp8lk6zwA=
这是我的代码来做到这一点。
object TestAudit {
def main(args: Array[String]) {
val inputPath = args(0)
val output = args(1)
val noOfHashPartitioner = args(2).toInt
//val conf = new SparkConf().setAppName("AuditXML").setMaster("local");
val conf = new SparkConf().setAppName("AuditXML")
val sc = new SparkContext(conf);
val input = sc.textFile(inputPath)
val pairedRDD = input.map(row => {
val split = row.split("\\|")
val fileName = split(0)
val fileContent = split(1)
(fileName, fileContent)
})
import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
pairedRDD.partitionBy(new HashPartitioner(10000)).saveAsHadoopFile("s3://a205381-tr-fr-development-us-east-1-trf-auditabilty//AUDITOUTPUT", classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])
}
}
即使我尝试减少 HashPartitioner 的数量,它也不起作用