在SparkSQL中,你没有选择,需要使用orderBy
具有一列或多列。对于 RDD,如果您愿意,可以使用自定义的类似 java 的比较器。确实,这是sortBy
的方法RDD
(参见Spark 2.4的scaladoc):
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
这意味着您可以提供Ordering
您选择的,这与 java 完全一样Comparator
(Ordering
实际上继承自Comparator
).
为简单起见,假设我想按列“x”的绝对值排序(这可以在没有比较器的情况下完成,但假设我需要使用比较器)。我首先在行上定义比较器:
class RowOrdering extends Ordering[Row] {
def compare(x : Row, y : Row): Int = x.getAs[Int]("x").abs - y.getAs[Int]("x").abs
}
现在让我们定义数据并对其进行排序:
val df = Seq( (0, 1),(1, 2),(2, 4),(3, 7),(4, 1),(5, -1),(6, -2),
(7, 5),(8, 5), (9, 0), (10, -9)).toDF("id", "x")
val rdd = df.rdd.sortBy(identity)(new RowOrdering(), scala.reflect.classTag[Row])
val sorted_df = spark.createDataFrame(rdd, df.schema)
sorted_df.show
+---+---+
| id| x|
+---+---+
| 9| 0|
| 0| 1|
| 4| 1|
| 5| -1|
| 6| -2|
| 1| 2|
| 2| 4|
| 7| 5|
| 8| 5|
| 3| 7|
| 10| -9|
+---+---+
另一种解决方案是定义隐式排序,以便排序时无需提供它。
implicit val ord = new RowOrdering()
df.rdd.sortBy(identity)
最后,请注意df.rdd.sortBy(_.getAs[Int]("x").abs)
会达到相同的结果。此外,您可以使用元组排序来执行更复杂的操作,例如按绝对值排序,如果相等,则将正值放在前面:
df.rdd.sortBy(x => (x.getAs[Int]("x").abs, - x.getAs[Int]("x"))) //RDD
df.orderBy(abs($"x"), - $"x") //dataframe