我正在使用 Spark 2.0.1,并希望用列中最后一个已知的值填充 nan 值。
我能找到的唯一关于火花的参考Spark / Scala:使用最后一次观察进行前向填充 or 用 pyspark 之前已知的良好值填充 null似乎使用RDD。
我宁愿留在数据框/数据集世界中并可能处理多个 nan 值。
这可能吗?
我的假设是,数据(最初从例如 CSV 文件加载)按时间排序,并且此顺序保留在分布式设置中,例如按关闭/最后一个已知值填充是正确的。也许对于大多数人来说,用先前的值填充就足够了记录中连续不存在 2 个或更多 nan 记录。这实际上成立吗?
重点是,一个
myDf.sort("foo").show
会破坏任何订单,例如全部null
价值观将是第一位的。
一个小例子:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
结果是
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
我想用最后一个已知的值来修复该值。我怎样才能实现这个目标?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
edit
就我而言,填充上面行中的值就足够了,因为只有非常有限的错误值。
edit2
我尝试添加索引列
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
然后填写最后一个值。
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
但上面写着以下警告:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。我怎样才能引入有意义的分区?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+