让我稍微简化一下你的问题。 (实际上很多。)我们有一个RDD[(Int, String)]
我们想要找到最常见的前 10 个String
s 为每个Int
(均在 0–100 范围内)。
与您的示例中所示的排序不同,使用 Spark 内置的效率更高RDD.top(n)
方法。它的运行时间与数据大小成线性关系,并且需要移动的数据比排序少得多。
考虑实施top
in RDD.scala https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1113。您想要执行相同的操作,但每个优先级队列(堆)Int
钥匙。代码变得相当复杂:
import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.
def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
// A heap that only keeps the top N values, so it has bounded size.
type Heap = BoundedPriorityQueue[(Long, String)]
// Get the word counts.
val counts: RDD[[(Int, String), Long)] =
rdd.map(_ -> 1L).reduceByKey(_ + _)
// In each partition create a column -> heap map.
val perPartition: RDD[Map[Int, Heap]] =
counts.mapPartitions { items =>
val heaps =
collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
for (((k, v), count) <- items) {
heaps(k) += count -> v
}
Iterator.single(heaps)
}
// Merge the per-partition heap maps into one.
val merged: Map[Int, Heap] =
perPartition.reduce { (heaps1, heaps2) =>
val heaps =
collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
for (cv <- heap) {
heaps(k) += cv
}
}
heaps
}
// Discard counts, return just the top strings.
merged.mapValues(_.map { case(count, value) => value })
}
这很有效,但也很痛苦,因为我们需要同时处理多个列。拥有一个会更容易RDD
每列只需调用rdd.top(10)
在各个。
不幸的是,将 RDD 分割成 N 个更小的 RDD 的简单方法需要执行 N 遍:
def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
together.cache // We will make N passes over this RDD.
(0 until columns).map {
i => together.filter { case (key, value) => key == i }.values
}
}
更有效的解决方案可能是按键将数据写出到单独的文件中,然后将其加载回单独的 RDD 中。这在中讨论按 Spark 键写入多个输出 - 一个 Spark 作业 https://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job.