我有一个非常大的 Spark DataFrame,其中有许多列,我想对是否将它们保留在我的管道中做出明智的判断,部分取决于它们有多大。我所说的“有多大”是指缓存此 DataFrame 时 RAM 中的大小(以字节为单位),我希望这是对处理此数据的计算成本的一个不错的估计。有些列是简单类型(例如双精度数、整数),但其他列是复杂类型(例如数组和可变长度的映射)。
我尝试过的一种方法是缓存没有和有问题的列的 DataFrame,检查 Spark UI 中的“存储”选项卡,并获取差异。但对于具有很多列的 DataFrame 来说,这是一个烦人且缓慢的练习。
我通常使用 PySpark,因此 PySpark 答案会更好,但 Scala 也可以。
我找到了一个基于此相关答案的解决方案:https://stackoverflow.com/a/49529028 https://stackoverflow.com/a/49529028.
假设我正在使用一个名为的数据框df
and a SparkSession
称为的对象spark
:
import org.apache.spark.sql.{functions => F}
// force the full dataframe into memory (could specify persistence
// mechanism here to ensure that it's really being cached in RAM)
df.cache()
df.count()
// calculate size of full dataframe
val catalystPlan = df.queryExecution.logical
val dfSizeBytes = spark.sessionState.executePlan(catalystPlan).optimizedPlan.stats.sizeInBytes
for (col <- df.columns) {
println("Working on " + col)
// select all columns except this one:
val subDf = df.select(df.columns.filter(_ != col).map(F.col): _*)
// force subDf into RAM
subDf.cache()
subDf.count()
// calculate size of subDf
val catalystPlan = subDf.queryExecution.logical
val subDfSizeBytes = spark.sessionState.executePlan(catalystPlan).optimizedPlan.stats.sizeInBytes
// size of this column as a fraction of full dataframe
val colSizeFrac = (dfSizeBytes - subDfSizeBytes).toDouble / dfSizeBytes.toDouble
println("Column space fraction is " + colSizeFrac * 100.0 + "%")
subDf.unpersist()
}
一些证据表明这种方法给出了合理的结果:
- 报告的列大小加起来为 100%。
- 简单类型列(例如整数或双精度数)每行占用预期 4 个字节或 8 个字节。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)