用平均值替换缺失值 - Spark Dataframe

2024-03-02

我有一个 Spark Dataframe,其中缺少一些值。我想通过用该列的平均值替换缺失值来执行简单的插补。我对 Spark 很陌生,所以我一直在努力实现这个逻辑。到目前为止,这是我设法做到的:

a) 要对单个列(假设是 A 列)执行此操作,这行代码似乎有效:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

b)但是,我无法弄清楚如何对数据框中的所有列执行此操作。我正在尝试 Map 函数,但我相信它循环遍历数据帧的每一行

c)SO上有一个类似的问题 -here https://stackoverflow.com/questions/38356476/spark-replace-null-values-in-dataframe-with-mean-of-column。虽然我喜欢这个解决方案(使用聚合表和合并),但我非常想知道是否有一种方法可以通过循环遍历每一列来实现这一点(我来自 R,因此使用高阶函数循环遍历每一列,例如lapply 对我来说似乎更自然)。

Thanks!


火花 >= 2.2

您可以使用org.apache.spark.ml.feature.Imputer(支持均值和中值策略)。

Scala :

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setInputCols(df.columns)
  .setOutputCols(df.columns.map(c => s"${c}_imputed"))
  .setStrategy("mean")

imputer.fit(df).transform(df)

Python:

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

火花

给你:

import org.apache.spark.sql.functions.mean

df.na.fill(df.columns.zip(
  df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)

where

df.columns.map(mean(_)): Array[Column] 

计算每列的平均值,

df.select(_: *).first.toSeq: Seq[Any]

收集聚合值并将行转换为Seq[Any](我知道这不是最理想的,但这是我们必须使用的 API),

df.columns.zip(_).toMap: Map[String,Any] 

creates aMap: Map[String, Any]它从列名称映射到其平均值,最后:

df.na.fill(_): DataFrame

使用以下方法填充缺失值:

fill: Map[String, Any] => DataFrame 

from DataFrameNaFunctions.

忽略NaN您可以替换的条目:

df.select(df.columns.map(mean(_)): _*).first.toSeq

with:

import org.apache.spark.sql.functions.{col, isnan, when}


df.select(df.columns.map(
  c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

用平均值替换缺失值 - Spark Dataframe 的相关文章

随机推荐