给出以下代码:
import java.sql.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SortQuestion extends App{
val spark = SparkSession.builder().appName("local").master("local[*]").getOrCreate()
import spark.implicits._
case class ABC(a: Int, b: Int, c: Int)
val first = Seq(
ABC(1, 2, 3),
ABC(1, 3, 4),
ABC(2, 4, 5),
ABC(2, 5, 6)
).toDF("a", "b", "c")
val second = Seq(
(1, 2, (Date.valueOf("2018-01-02"), 30)),
(1, 3, (Date.valueOf("2018-01-01"), 20)),
(2, 4, (Date.valueOf("2018-01-02"), 50)),
(2, 5, (Date.valueOf("2018-01-01"), 60))
).toDF("a", "b", "c")
first.join(second.withColumnRenamed("c", "c2"), Seq("a", "b")).groupBy("a").agg(sort_array(collect_list("c2")))
.show(false)
}
Spark 产生以下结果:
+---+----------------------------------+
|a |sort_array(collect_list(c2), true)|
+---+----------------------------------+
|1 |[[2018-01-01,20], [2018-01-02,30]]|
|2 |[[2018-01-01,60], [2018-01-02,50]]|
+---+----------------------------------+
这意味着 Spark 正在按日期对数组进行排序(因为它是第一个字段),但我想指示 Spark 按该嵌套结构中的特定字段进行排序。
我知道我可以将数组重塑为(value, date)
但这似乎不方便,我想要一个通用的解决方案(想象我有一个大的嵌套结构,5层深,我想按特定列对该结构进行排序)。
有没有办法做到这一点?我错过了什么吗?