我想使用 Spark 将大型(51GB)XML 文件(在外部 HDD 上)读取到数据帧中(使用Spark-XML 插件 https://github.com/databricks/spark-xml),进行简单的映射/过滤,重新排序,然后将其作为 CSV 文件写回磁盘。
但我总是得到一个java.lang.OutOfMemoryError: Java heap space
不管我如何调整这个。
我想了解为什么增加分区数量不能阻止 OOM 错误
难道它不应该将任务分成更多部分,以便每个部分都更小并且不会导致内存问题吗?
(Spark 不可能尝试将所有内容都填充到内存中,如果不合适就会崩溃,对吗?)
我尝试过的事情:
- 读取和写入时对数据帧重新分区/合并(5,000 和 10,000 个分区)(初始值为 1,604)
- 使用较少数量的执行器(6、4,甚至2执行者我收到 OOM 错误!)
- 减小分割文件的大小(默认为 33MB)
- 提供大量内存(我拥有的全部)
- 增加
spark.memory.fraction
至 0.8(默认为 0.6)
- 减少
spark.memory.storageFraction
至 0.2(默认为 0.5)
- set
spark.default.parallelism
到 30 和 40(我的默认值是 8)
- set
spark.files.maxPartitionBytes
至 64M(默认为 128M)
我的所有代码都在这里(请注意,我没有缓存任何内容):
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema) // defined previously
.option("rowTag", "row")
.load(s"$pathToInputXML")
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604
// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)
// filter and select only the cols i'm interested in
val dsout = df2
.where( df2.col("_TypeId") === "1" )
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
).as[Post]
// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r
// more mapping
dsout
.map{
case Post(id,title,body,tags) =>
val body1 = tagPat.replaceAllIn(body,"")
val body2 = whitespacePat.replaceAllIn(body1," ")
Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))
}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)
NOTES
- 输入拆分非常小(仅 33MB),那么为什么我不能让 8 个线程每个处理一个拆分呢?它真的不应该破坏我的记忆(我已经知道
UPDATE我写了一个较短的版本只读取文件然后 forEachPartition(println) 的代码。
我遇到同样的 OOM 错误:
val df: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.format("com.databricks.spark.xml")
.schema(customSchema)
.option("rowTag", "row")
.load(s"$pathToInputXML")
.repartition(numPartitions)
println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
df
.where(df.col("_PostTypeId") === "1")
.select(
df("_Id").as("id"),
df("_Title").as("title"),
df("_Body").as("body"),
df("_Tags").as("tags")
).as[Post]
.map {
case Post(id, title, body, tags) =>
Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
}
.foreachPartition { rdd =>
if (rdd.nonEmpty) {
println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
}
}
P.S.:我使用的是 Spark v 2.1.0。我的机器有 8 核和 16 GB 内存。