如果你想完全留在sparkSQL中
val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ),
(1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"),
(2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp")
val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp")
val win = Window.partitionBy("ID").orderBy($"date".desc)
val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r")
val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp"))
val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*)
val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x))
df3.select((Array(col(df2.columns(0)))++exprs2): _*).show
+---+----+--------+-------+-------+----------+
| ID|Name|Passport|Country|License| date|
+---+----+--------+-------+-------+----------+
| 1|Shah| 12345| null| ABC|2018-12-02|
| 2| PJ| null| ANB| a|2018-10-02|
+---+----+--------+-------+-------+----------+