我对 Spark 是一个相对初学者。我有一个宽数据框(1000 列),我想根据相应列是否缺少值来添加列
so
+----+
| A |
+----+
| 1 |
+----+
|null|
+----+
| 3 |
+----+
becomes
+----+-------+
| A | A_MIS |
+----+-------+
| 1 | 0 |
+----+-------+
|null| 1 |
+----+-------+
| 3 | 1 |
+----+-------+
这是自定义机器学习转换器的一部分,但算法应该很清晰。
override def transform(dataset: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {
var ds = dataset
dataset.columns.foreach(c => {
if (dataset.filter(col(c).isNull).count() > 0) {
ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
}
})
ds.toDF()
}
循环列,如果 > 0 个空值则创建一个新列。
传入的数据集被缓存(使用 .cache 方法),相关配置设置为默认值。
目前,它在一台笔记本电脑上运行,即使行数极少,运行 1000 列也需要 40 分钟左右。
我认为问题是由于访问数据库造成的,因此我尝试使用镶木地板文件,但得到了相同的结果。查看作业 UI,它似乎正在执行文件扫描以进行计数。
有没有办法改进这个算法以获得更好的性能,或者以某种方式调整缓存?增加spark.sql.inMemoryColumnarStorage.batchSize只会给我带来OOM错误。
删除条件:
if (dataset.filter(col(c).isNull).count() > 0)
并只留下内部表达。正如所写,Spark 需要 #columns 数据扫描。
如果您希望修剪列计算一次统计信息,如中所述使用 Pyspark 计算 Spark 数据帧每列中非 NaN 条目的数量 https://stackoverflow.com/q/33900726/8371915,并使用单个drop
call.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)