使用 Spark 1.4.0、Scala 2.10
我一直在试图找出一种方法来用最后一个已知的观察结果转发填充空值,但我没有看到一个简单的方法。我认为这是一件很常见的事情,但找不到显示如何执行此操作的示例。
我看到用值向前填充 NaN 的函数,或用偏移量填充或移动数据的滞后/超前函数,但没有任何东西可以获取最后一个已知值。
在网上查看,我在 R 中看到了很多关于相同问题的问答,但在 Spark / Scala 中却没有。
我正在考虑映射日期范围,从结果中过滤出 NaN 并选择最后一个元素,但我想我对语法感到困惑。
使用 DataFrames 我尝试类似的方法
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
但这对我没有任何帮助。
过滤部分不起作用;映射函数返回spark.sql.Columns的序列,但过滤器函数期望返回布尔值,因此我需要从列中获取一个值进行测试,但似乎只有返回列的列方法。
有没有办法在 Spark 上更“简单”地做到这一点?
感谢您的输入
Edit:
简单示例示例输入:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
预期输出:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
Note:
- 我有很多列,其中许多列都有这种缺失的数据模式,但不是在同一日期/时间。如果需要,我将一次转换一列。
EDIT:
按照 @zero323 的回答,我尝试了以下方法:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
广播变量最终作为不带空值的值列表。这是进步,但我仍然无法让映射工作。
但我什么也没得到,因为索引i
不映射到原始数据,它映射到不带 null 的子集。
我在这里缺少什么?
编辑和解决方案(从 @zero323 的答案推断):
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
如果您使用 RDD 而不是 DataFrame,请参阅下面 Zero323 的答案以获取更多选项。上面的解决方案可能不是最有效的,但对我有用。如果您正在寻求优化,请查看 RDD 解决方案。