lag
回报o.a.s.sql.Column
这是不可序列化的。同样的事情也适用于WindowSpec
。在交互模式下,这些对象可以作为闭包的一部分包含在内map
:
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
df: org.apache.spark.sql.DataFrame = [x: string, y: int]
scala> val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097
scala> val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
scala> def f(x: Any) = x.toString
f: (x: Any)String
scala> df.select(lag_y).map(f _).first
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
- object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)
一个简单的解决方案是将两者标记为瞬态:
scala> @transient val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470
scala> @transient val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
scala> df.select(lag_y).map(f _).first
res1: String = [null]