我相信这个问题还有其他方面被忽视了climbage and eliasah:
如果操作不会减少数据量,则它必须以一种或另一种语义上等效的方式GroupByKey
。假设我们有RDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
我们想要连接给定键的所有字符串。和groupByKey
这很简单:
rdd.groupByKey.mapValues(_.mkString(""))
天真的解决方案reduceByKey
看起来像这样:
rdd.reduceByKey(_ + _)
它很短并且可以说很容易理解,但存在两个问题:
- 效率极低,因为它创建了一个新的
String
每次都对象*
- 表明您执行的操作比实际情况要便宜,特别是如果您仅分析 DAG 或调试字符串
为了解决第一个问题,我们需要一个可变的数据结构:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
它仍然表明其他确实正在发生的事情并且非常冗长,特别是如果在脚本中重复多次的话。您当然可以提取匿名函数
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
但归根结底,这仍然意味着需要付出额外的努力来理解这段代码,增加复杂性并且没有真正的附加价值。我发现特别麻烦的一件事是显式包含可变数据结构。即使 Spark 能够处理几乎所有的复杂性,这也意味着我们不再拥有优雅的、引用透明的代码。
我的观点是,如果你真的通过各种手段减少数据量,请使用reduceByKey
。否则,你的代码会变得更难编写、更难分析,而且得不到任何回报。
Note:
这个答案主要针对ScalaRDD
API。当前的 Python 实现与其 JVM 对应物有很大不同,并且包括比 naive 提供显着优势的优化reduceByKey
实施情况groupBy
类似的操作。
For Dataset
API见数据帧/数据集分组按行为/优化.
* See Scala 与 Python 的 Spark 性能比较举一个有说服力的例子