TL;DR要么不使用 UDAF 要么使用原始类型代替ArrayType
.
Without UserDefinedFunction
两种解决方案都应该跳过内部和外部表示之间昂贵的杂耍。
使用标准聚合和pivot
这使用标准 SQL 聚合。虽然进行了内部优化,但当键的数量和数组的大小增加时,其成本可能会很高。
给定输入:
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
You can:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}
val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)
df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
使用 RDD API 与combineByKey
/ aggregateByKey
.
普通旧byKey
与可变缓冲区的聚合。没有花里胡哨的东西,但在广泛的输入范围内应该表现得相当好。如果您怀疑输入稀疏,您可以考虑更有效的中间表示,例如可变的Map
.
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Using UserDefinedFunction
与原始类型
据我了解内部结构,性能瓶颈是ArrayConverter.toCatalystImpl https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L159-L174.
看起来每次调用都会调用它MutableAggregationBuffer.update https://github.com/apache/spark/blob/3ff766f61afbd09dcc7a73eae02e68a39114ce3f/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala#L246,然后依次分配新的GenericArrayData
对于每个Row
.
如果我们重新定义bufferSchema
as:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
both update
and merge
可以表示为缓冲区中原始值的简单替换。调用链仍然会很长,但是它不需要副本/转换 https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L324和疯狂的分配。省略null
检查你需要类似的东西
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
and
for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
分别。
Finally evaluate
应该采取Row
并将其转换为输出Seq
:
for (i <- 0 to nBuckets) yield buffer.getLong(i)
请注意,在此实现中,一个可能的瓶颈是merge
。虽然它不应该引入任何新的性能问题,但M桶,每次调用merge
is O(M).
With K唯一的密钥,以及P它将被称为分区M * K在最坏的情况下,每个键在每个分区上至少出现一次。这有效地增加了merge
组件到O(M * N * K).
一般来说,您对此无能为力。但是,如果您对数据分布做出具体假设(数据稀疏,密钥分布均匀),您可以稍微简化一下,首先进行洗牌:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))
如果满足假设,则应该:
- 通过打乱稀疏对来减少打乱大小,而不是像密集数组那样,违反直觉
Rows
.
- 仅使用更新来聚合数据(每个O(1))可能仅作为索引的子集进行接触。
然而,如果一个或两个假设不满足,您可以预期洗牌大小将会增加,而更新数量将保持不变。同时,数据偏差可能会让事情变得更糟。update
- shuffle
- merge
设想。
Using Aggregator
具有“强”类型Dataset
:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
with Serializable {
def zero = Array.fill(bucketSize)(0L)
def reduce(acc: Array[Long], x: I) = {
val (i, v) = f(x)
acc(i) += v
acc
}
def merge(acc1: Array[Long], acc2: Array[Long]) = {
for {
i <- 0 until bucketSize
} acc1(i) += acc2(i)
acc1
}
def finish(acc: Array[Long]) = acc.toSeq
def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
可以如下所示使用
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
ds
.groupByKey(_._1)
.agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
.show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2) |
+-----+-------------------------------+
|1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+
Note:
也可以看看SPARK-27296 https://issues.apache.org/jira/browse/SPARK-27296 - 用户定义聚合函数 (UDAF) 存在重大效率问题