Spark UDAF以ArrayType作为bufferSchema性能问题

2024-04-03

我正在开发一个返回元素数组的 UDAF。

每次更新的输入是索引和值的元组。

UDAF 的作用是将同一索引下的所有值相加。

Example:

对于输入(索引,值):(2,1),(3,1),(2,3)

应该返回 (0,0,4,1,...,0)

逻辑工作正常,但我有一个问题更新方法,仅是我的实现每行更新 1 个单元格,但该方法中的最后一个分配实际上复制整个数组- 这是多余的并且非常耗时。

仅此任务就负责98% 的查询执行时间.

我的问题是,如何减少这个时间?是否可以在缓冲区数组中分配 1 个值而不必替换整个缓冲区?

P.S.:我正在使用 Spark 1.6,我无法很快升级它,所以请坚持使用适用于此版本的解决方案。

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

TL;DR要么不使用 UDAF 要么使用原始类型代替ArrayType.

Without UserDefinedFunction

两种解决方案都应该跳过内部和外部表示之间昂贵的杂耍。

使用标准聚合和pivot

这使用标准 SQL 聚合。虽然进行了内部优化,但当键的数量和数组的大小增加时,其成本可能会很高。

给定输入:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

You can:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

使用 RDD API 与combineByKey / aggregateByKey.

普通旧byKey与可变缓冲区的聚合。没有花里胡哨的东西,但在广泛的输入范围内应该表现得相当好。如果您怀疑输入稀疏,您可以考虑更有效的中间表示,例如可变的Map.

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

Using UserDefinedFunction与原始类型

据我了解内部结构,性能瓶颈是ArrayConverter.toCatalystImpl https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L159-L174.

看起来每次调用都会调用它MutableAggregationBuffer.update https://github.com/apache/spark/blob/3ff766f61afbd09dcc7a73eae02e68a39114ce3f/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala#L246,然后依次分配新的GenericArrayData对于每个Row.

如果我们重新定义bufferSchema as:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}

both update and merge可以表示为缓冲区中原始值的简单替换。调用链仍然会很长,但是它不需要副本/转换 https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L324和疯狂的分配。省略null检查你需要类似的东西

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))

and

for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}

分别。

Finally evaluate应该采取Row并将其转换为输出Seq:

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

请注意,在此实现中,一个可能的瓶颈是merge。虽然它不应该引入任何新的性能问题,但M桶,每次调用merge is O(M).

With K唯一的密钥,以及P它将被称为分区M * K在最坏的情况下,每个键在每个分区上至少出现一次。这有效地增加了merge组件到O(M * N * K).

一般来说,您对此无能为力。但是,如果您对数据分布做出具体假设(数据稀疏,密钥分布均匀),您可以稍微简化一下,首先进行洗牌:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

如果满足假设,则应该:

  • 通过打乱稀疏对来减少打乱大小,而不是像密集数组那样,违反直觉Rows.
  • 仅使用更新来聚合数据(每个O(1))可能仅作为索引的子集进行接触。

然而,如果一个或两个假设不满足,您可以预期洗牌大小将会增加,而更新数量将保持不变。同时,数据偏差可能会让事情变得更糟。update - shuffle - merge设想。

Using Aggregator具有“强”类型Dataset:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}

可以如下所示使用

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+

Note:

也可以看看SPARK-27296 https://issues.apache.org/jira/browse/SPARK-27296 - 用户定义聚合函数 (UDAF) 存在重大效率问题

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark UDAF以ArrayType作为bufferSchema性能问题 的相关文章

  • 迭代列表的奇怪速度差异

    我创建了两个重复两个不同值的长列表 在第一个列表中 值交替出现 在第二个列表中 一个值出现在另一个值之前 a1 object object 10 6 a2 a1 2 a1 1 2 然后我迭代它们 不对它们执行任何操作 for in a1 p
  • 如何在 Scala 2.11 中查找封闭源文件的名称

    在编译时 如何在 scala 2 11 中检索当前源文件 编写代码的位置 的名称 这是一种实际有效的方法 val srcFile new Exception getStackTrace head getFileName println sr
  • 在 C 中复制两个相邻字节的最快方法是什么?

    好吧 让我们从最明显的解决方案开始 memcpy Ptr const char a b 2 调用库函数的开销相当大 编译器有时不会优化它 我不会依赖编译器优化 但即使 GCC 很聪明 如果我将程序移植到带有垃圾编译器的更奇特的平台上 我也不
  • 如何关闭 Scala 中因方法重载而导致代码无法编译的特定隐式?

    我正忙着尝试自己回答这个问题 Scala Play 2 4 x 通过 anorm MySQL 处理扩展字符到 Java Mail https stackoverflow com questions 31417718 scala play 2
  • 当泛型类型与无界通配符一起使用时,不考虑类型参数绑定

    在我的项目中 我有一个这样的星座 trait F trait X A lt F def test x X X lt F x Trait X有一个类型参数 其上限为F 根据我的理解 类型X and X lt F 应该是等价的 但scalac2
  • 跨数据库的用户定义类型

    我有一个数据库 其中包含我在多个数据库中使用的常用函数 这些函数之一采用表作为参数 该参数是用户定义的类型 我想知道是否有办法从另一个数据库调用这个函数 我尝试在其他数据库中定义类型 如下所示 DECLARE bits as Common
  • 使用 Akka 1.3 的 actor 时,我需要注意生产者-消费者速率匹配吗?

    使用 Akka 1 3 时 我是否需要担心当生成消息的 Actor 生成消息的速度比使用消息的 Actor 的处理速度快时会发生什么 如果没有任何机制 在长时间运行的进程中 队列大小将增大以消耗所有可用内存 The doc http doc
  • Scala 中的随机列表[重复]

    这个问题在这里已经有答案了 我对 scala 中的随机播放列表有疑问 使用scala util Random 例如我有 val a cyan val b magenta val c yellow val d key val color Ra
  • 使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错

    这是一个 Spark Streaming 应用程序 它使用编码的 Kafka 消息Proto Buf Using scalapb图书馆 我收到以下错误 请帮忙 gt com google protobuf InvalidProtocolBu
  • R、Rcpp 与 Armadillo 中矩阵 rowSums() 与 colSums() 的效率

    背景 来自 R 编程 我正在扩展到 C C 形式的编译代码Rcpp 作为循环交换 以及一般的 C C 效果的实践练习 我实现了 R 的等效项rowSums and colSums 矩阵的函数Rcpp 我知道它们以 Rcpp 糖的形式存在 并
  • 为什么 Delphi 中的 ADO Next 记录处理速度变慢?

    我有一个多年前开发的 Delphi 4 程序 它使用Opus 直接访问 http sourceforge net projects directaccess 按顺序搜索 Microsoft Access 数据库并检索所需的记录 Delphi
  • linux perf:如何解释和查找热点

    我尝试了linux perf https perf wiki kernel org index php Main Page今天很实用 但在解释其结果时遇到了困难 我习惯了 valgrind 的 callgrind 这当然是与基于采样的 pe
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • 如何调用 Scala 抽象类型的构造函数?

    我试图弄清楚如何调用 Scala 抽象类型的构造函数 class Journey val length Int class PlaneJourney length Int extends Journey length class BoatJ
  • 过滤器的 Scala 集合类型

    假设您有一个 List 1 1 其类型为 List Any 这当然是正确的且符合预期 现在如果我像这样映射列表 scala gt List 1 1 map case x Int gt x case y String gt y toInt 结
  • pyspark加入多个条件

    我如何指定很多条件 当我使用pyspark时 join 例子 与蜂巢 query select a NUMCNT b NUMCNT as RNUMCNT a POLE b POLE as RPOLE a ACTIVITE b ACTIVIT
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • PHP 脚本不断执行 mmap/munmap

    我的 PHP 脚本包含一个循环 它只不过是回显和取消引用指针 如 tab othertab i gt 中的内容 直到昨天 这个脚本开始变得非常慢 比以前慢了 50 倍 之前 它一直运行良好 使用 strace 后 我发现 90 的情况下 脚
  • @tailrec为什么这个方法不编译为“包含不在尾部位置的递归调用”?

    tailrec private def loop V key String V key match case gt loop key 此方法无法编译并抱怨它 包含不在尾部位置的递归调用 有人可以向我解释一下发生了什么事吗 这个错误消息对我来
  • 正则表达式库基准

    我最近一直想知道正则表达式实现的性能 并且很难想出很多有用的信息 它很容易对浏览器 javascript 正则表达式性能进行基准测试 网上有很多工具 Chrome 和 Opera 中的 javascript 正则表达式实现几乎摧毁了所有其他

随机推荐