Spark柱状性能

2023-12-19

我对 Spark 是一个相对初学者。我有一个宽数据框(1000 列),我想根据相应列是否缺少值来添加列

so



+----+          
| A  |
+----+
| 1  |
+----+
|null|     
+----+
| 3  |
+----+
  

becomes



+----+-------+          
| A  | A_MIS |
+----+-------+
| 1  |   0   |
+----+-------+
|null|   1   |
+----+-------+
| 3  |   1   |
+----+-------+
  

这是自定义机器学习转换器的一部分,但算法应该很清晰。

override def transform(dataset: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {
  var ds = dataset
  dataset.columns.foreach(c => {
    if (dataset.filter(col(c).isNull).count() > 0) {
      ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
    }
  })


  ds.toDF()
}

循环列,如果 > 0 个空值则创建一个新列。

传入的数据集被缓存(使用 .cache 方法),相关配置设置为默认值。 目前,它在一台笔记本电脑上运行,即使行数极少,运行 1000 列也需要 40 分钟左右。 我认为问题是由于访问数据库造成的,因此我尝试使用镶木地板文件,但得到了相同的结果。查看作业 UI,它似乎正在执行文件扫描以进行计数。

有没有办法改进这个算法以获得更好的性能,或者以某种方式调整缓存?增加spark.sql.inMemoryColumnarStorage.batchSize只会给我带来OOM错误。


删除条件:

if (dataset.filter(col(c).isNull).count() > 0) 

并只留下内部表达。正如所写,Spark 需要 #columns 数据扫描。

如果您希望修剪列计算一次统计信息,如中所述使用 Pyspark 计算 Spark 数据帧每列中非 NaN 条目的数量 https://stackoverflow.com/q/33900726/8371915,并使用单个drop call.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark柱状性能 的相关文章

随机推荐