如何定义自定义聚合函数来对向量列求和?

2023-12-04

我有一个两列的数据框,ID类型的Int and Vec类型的Vector (org.apache.spark.mllib.linalg.Vector).

DataFrame 如下所示:

ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....

我想做一个groupBy($"ID")然后通过对向量求和来对每个组内的行应用聚合。

上述示例的期望输出是:

ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...

可用的聚合函数将不起作用,例如df.groupBy($"ID").agg(sum($"Vec")将导致 ClassCastException。

如何实现自定义聚合函数,使我能够对向量或数组求和或任何其他自定义操作?


火花 >= 3.0

您可以使用Summarizer with sum

import org.apache.spark.ml.stat.Summarizer

df
  .groupBy($"id")
  .agg(Summarizer.sum($"vec").alias("vec"))

火花

就我个人而言,我不会为 UDAF 烦恼。不仅冗长而且速度不快(Spark UDAF以ArrayType作为bufferSchema性能问题)相反,我会简单地使用reduceByKey / foldByKey:

import org.apache.spark.sql.Row
import breeze.linalg.{DenseVector => BDV}
import org.apache.spark.ml.linalg.{Vector, Vectors}

def dv(values: Double*): Vector = Vectors.dense(values.toArray)

val df = spark.createDataFrame(Seq(
    (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)),
    (2, dv(7,5,0)), (2, dv(3,3,4)), 
    (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7)))
  ).toDF("id", "vec")

val aggregated = df
  .rdd
  .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) }
  .foldByKey(BDV.zeros[Double](3))(_ += _)
  .mapValues(v => Vectors.dense(v.toArray))
  .toDF("id", "vec")

aggregated.show

// +---+--------------+
// | id|           vec|
// +---+--------------+
// |  1| [5.0,2.0,7.0]|
// |  2|[10.0,8.0,4.0]|
// |  3|[7.0,15.0,9.0]|
// +---+--------------+

只是为了比较“简单”的 UDAF。所需进口:

import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
  UserDefinedAggregateFunction}
import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes}
import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray

类定义:

class VectorSum (n: Int) extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("v", SQLDataTypes.VectorType)
    def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
    def dataType = SQLDataTypes.VectorType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, Array.fill(n)(0.0))
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) {
        val buff = buffer.getAs[WrappedArray[Double]](0) 
        val v = input.getAs[Vector](0).toSparse
        for (i <- v.indices) {
          buff(i) += v(i)
        }
        buffer.update(0, buff)
      }
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
      val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
      for ((x, i) <- buff2.zipWithIndex) {
        buff1(i) += x
      }
      buffer1.update(0, buff1)
    }

    def evaluate(buffer: Row) =  Vectors.dense(
      buffer.getAs[Seq[Double]](0).toArray)
} 

以及一个示例用法:

df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show

// +---+--------------+
// | id|           vec|
// +---+--------------+
// |  1| [5.0,2.0,7.0]|
// |  2|[10.0,8.0,4.0]|
// |  3|[7.0,15.0,9.0]|
// +---+--------------+

也可以看看:如何在 Spark SQL 中查找分组向量列的平均值?.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何定义自定义聚合函数来对向量列求和? 的相关文章

  • 使用intellij Scala编译错误

    我正在尝试使用 intelliJ 14 1 4 编译混合 java scala 代码 但它一直给我这个错误 Error Compiling SBT component compiler interface 2 9 2 52 0 我尝试降级我
  • Scala 将字符串转换为映射

    转换这个最快的方法是什么 a ab b cd c cd d de e ef f fg 进入 scala 中的可变映射 我从 500MB 文件中读取了这个输入字符串 这就是我关心速度的原因 如果您的 JSON 像您的示例一样简单 即一系列键
  • 凿子“Enum(UInt(), 5)”失败

    当我尝试使用 Chisel 构建 FSM 时 我使用了 Enum 如 Chisel 教程所述 然而 我遇到了这样的错误 my code val sIdle s1 s2 s3 s4 Nil Enum UInt 5 但是 当我执行时sbt ru
  • 生成 k 个成对独立的哈希函数

    我正在尝试实施一个计数最小草图 http en wikipedia org wiki Count Min sketchScala中的算法 所以我需要生成k个成对独立的哈希函数 这是一个比我以前编写过的任何东西都低的级别 除了算法类之外 我对
  • 当恰好有一个选项非空时执行某项操作

    如果两个选项之一非空 我想计算一些东西 显然这可以通过模式匹配来完成 但是有更好的方法吗 o1 o2 match case Some o None gt Some compute o case None Some o gt Some com
  • Scala 插入列表中的特定位置

    这是我确实解决的问题 但是作为一个完全命令式的 Scala 菜鸟 我觉得我发现了一些完全不优雅的东西 任何改进的想法表示赞赏 val l1 4 1 2 3 4 Nil original list val insert List 88 99
  • scala 元组拆包

    我知道这个问题已经以不同的方式出现过很多次 但我仍然不清楚 有没有办法达到以下目的 def foo a Int b Int foo a b right way to invoke foo foo getParams is there a w
  • 通过Listener获取Spark thrift服务器查询中读取的行数

    我正在尝试为我们的 ST 服务器构建一个监控系统 到目前为止 诸如记录查询 检索的行 红色和花费的时间之类的事情都很好 我已经实现了一个自定义侦听器 我能够毫无问题地检索查询和时间 侦听SparkListenerSQLExecutionSt
  • 如何抑制 EMR 上运行的 Spark-sql 的 INFO 消息?

    我正在 EMR 上运行 Spark 如中所述在 Amazon Elastic MapReduce 上运行 Spark 和 Spark SQL https aws amazon com articles 4926593393724923 本教
  • 在 Scala 中定义具有多个隐式参数的函数

    如何定义具有多个隐式参数的函数 def myfun arg String implicit p1 String implicit p2 Int doesn t work 它们必须全部放入一个参数列表中 并且该列表必须是最后一个 def my
  • 如何将参数传递给用户定义函数?

    我有一个用户定义的函数 calc udf calculate FloatType param1 A result df withColumn col1 calc col type col pos groupBy pk sum events
  • 抽象类型与类型参数

    在什么情况下抽象类型应该优先于类型参数 添加到我的之前关于抽象类型与参数的回答 https stackoverflow com questions 1154571 scala abstract types vs generics 11547
  • 使用值类参数的 Mockito 存根方法失败并出现 NullPointerException

    使用类型化值类作为 ID 是 Scala 中的常见模式 然而 在存根以值类作为参数的方法时 Mockito 似乎遇到了问题 在下面的示例中 第一个具有实际值的存根工作得很好 但第二个使用参数匹配器的存根会抛出 NullPointerExce
  • 登录模块控制标志在 JAAS 配置中不可用 - Scala Kafka

    尝试使用 kerberos 身份验证连接到 Kafka 时遇到问题 使用 scala 和我的jaas config看起来像这样 KafkaClient com sun security auth module Krb5LoginModule
  • 在使用 Phoenix 4.5 的 CDH 5.4 上运行 Spark 作业时未找到 PhoenixOutputFormat

    我通过重新编译源代码设法在 Cloudera CDH 5 4 上配置 Phoenix 4 5 sqlline py效果很好 但火花有问题 spark submit class my JobRunner master yarn deploy
  • Spark sql 每组前 n 个

    我怎样才能获得每组的前n名 比如说前10名或前3名 spark sql http www xaprb com blog 2006 12 07 how to select the firstleastmax row per group in
  • 如何区分spark中的操作是转换还是动作?

    最近在学习spark 对transformation和action操作很困惑 我阅读了spark文档和一些关于spark的书籍 我知道action会导致spark作业在集群中执行 而transformation则不会 但是spark的api
  • Spark中分布式读取CSV文件

    我正在开发一个 Spark 处理框架 它读取大型 CSV 文件 将它们加载到 RDD 中 执行一些转换 最后保存一些统计数据 相关 CSV 文件平均大小约为 50GB 我正在使用 Spark 2 0 我的问题是 当我使用sparkConte
  • Pyspark 应用程序仅部分利用 dataproc 集群资源

    我的 pyspark 应用程序在 106 36 MB 数据集 817 270 条记录 上运行 UDF 使用常规 python lambda 函数大约需要 100 小时 我创建了一个 Google Dataproc 集群 其中包含 20 个工
  • 在无形状中,有两个列表,其中一个包含另一个的类型类

    在无形中 我正在尝试编写一个需要两个 HList 的函数l1 and l2任意长度 具有以下属性 的长度l1 and l2是相同的 l2包含的确切类型l1 包装在常量外部类型构造函数中 So if l1 was 1 1 2 hello HN

随机推荐

  • 为什么我们需要为某些类型计算的输出指定一个精炼类型(或其等效的 Aux)?

    In https jto github io articles typelevel quicksort 我们接触到一个Sum键入谁的apply看起来像这样 def apply A lt Nat B lt Nat implicit sum S
  • TMP:如何推广向量的笛卡尔积?

    有一个优秀的 C 解决方案 实际上有 2 个解决方案 递归和非递归 整数向量向量的笛卡尔积 为了说明 简单起见 让我们只关注非递归版本 我的问题是 如何使用模板来概括这段代码以获取std tuple的齐次向量如下所示 2 5 9 foo b
  • 为什么 UIAlertView 不显示?

    由于某种原因 屏幕变暗并冻结 未显示警报 有人可以帮忙吗 提前致谢 else UIAlertView alert UIAlertView alloc initWithTitle Hello message Hello delegate se
  • C# MVC2 Jqgrid - 进行服务器端分页的正确方法是什么?

    我有一个 jqgrid 其中数据库表有几千行 但 jqrid 一次只显示 15 行 它应该显示得非常快 查询 15 行并不需要很长时间 但它需要 10 20 秒 这表明它每次都检索整个表 网格定义如下 Products jqGrid url
  • 如何设置ASP.NET SessionState读写LOCK超时?

    我有一个使用 ASP NET 会话状态的 WCF Web 服务 WCF 为每个请求在会话上设置读写锁 这意味着我的 Web 服务每次只能处理每个用户一个请求 这会损害我们 AJAX 应用程序的感知性能 所以我正在尝试找到一种方法来绕过这个限
  • Flask 查询 Mongodb 速度慢

    我使用 Flaks 构建一个管理网站来监控 MongoDB 用户数据 我的查询正在运行 但速度非常慢 加载 HTML 大约需要 3 5 秒 我测试了插入查询 它的工作时间不到 0 5 秒 我不认为这是服务器问题 烧瓶代码 A 它使用pymo
  • 加载图像时 WP8 内存不足错误

    我正在开发 Windows Phone 8 应用程序 我正在研究 Coverflow 功能 我正在尝试加载 600 个项目 但它总是显示Out of Memory Error Code
  • 在存在 getter 的情况下使用 Mobx makeObservable 让 setter 成为一个动作

    在 mobx 中 如果我想使用继承 我需要使用 makeObservable 而不是 makeAutoObservable 但是使用 makeObservable 需要我命名改变状态的操作 那么如何将 setter 声明为操作 因为它与 g
  • 检测并解析 JSON 文件中的转义字符“\”?

    我的 JSON 文件数据有问题 我正在使用来自谷歌的以下链接 http www google com finance company news q AAPL output json 当我想解析数据并将其显示在屏幕上时 就会出现问题 由于某种
  • 如何在Windows中的Visual Studio Code中创建tasks.json目录?

    我正在尝试编写一个任务 使用 Visual Studio Code 任务 在tasks json 中 为 Windows 用户创建一个目录 使用mkdir命令 运行良好除非该文件夹已存在 任务 json label release crea
  • Google Drive API 调用在 Fusiontables 上插入公共共享权限会导致内部错误

    我一直在尝试使用 Google Drive API 来使 Fusiontable 公开可读 但未能使其正常工作 我能够使用 OAuth 2 0 Playground 插入其他 Google Drive 文档的公共共享权限 但对于 Fusio
  • 如何在c#中执行多个oracle查询

    我正在尝试执行多个 Oracle 选择查询 如该帖子答案中所述here但我遇到了异常 如图所示 与oracle网站上解释的方式相同here 顺便说一句 是否有办法处理从这些查询之一找不到行的情况 string cmdstr begin op
  • R 中的“导入为”

    有没有办法在 R 中导入具有其他名称的包 就像您可能使用的那样import as在Python中 例如import numpy as np 我已经开始使用package function最近为了避免之间的冲突 比如说 Hmisc summa
  • 使用 ggplot2 绘制 xts 对象

    我想使用 ggplot2 绘制 xts 对象 但出现错误 这是我正在做的事情 dates lt c 2014 10 01 2014 11 01 2014 12 01 2015 01 01 2015 02 01 value lt as num
  • Android:按键盘上的“完成”按钮

    使用 appium 实现 Android 应用自动化 无法单击手机键盘上显示的 完成 按钮 有人可以帮助我们有什么方法可以使用 ADB Shell 命令单击 Android 手机完成按钮吗 在 Android 中 您无法直接单击键盘按键 除
  • C++ 从二进制文件中写入和读取双精度数

    我想对占用过多 RAM 的程序执行磁盘 I O 操作 我使用双精度矩阵 并认为将它们作为字节写入磁盘是最快的方法 我需要保留双精度 如何做到便携呢 我找到了这段代码 here 但作者说它不可移植 include
  • 将项目导入 Eclipse

    我有一个简单的问题如何将整个项目源导入 Eclipse 以便我可以轻松浏览它 具体来说 我已经下载了Maven源代码http maven apache org download html我只想像 Eclipse 中的其他项目一样查看它 我尝
  • JavaScript 中 [[prototype]] 属性的双括号有何意义?

    我知道每个 JavaScript 对象都有一个名为的内部属性 Prototype 一些实现允许通过名为的属性访问它 proto 而其他则不然 有什么特殊意义吗brackets该物业周围 它是一个 内部财产 对象的 来自ECMAScript
  • 双向链表——垃圾回收

    我创建了一个双向链表 我的列表仅包含 2 个元素 假设node1 and node2 并且我想删除head指向第一个节点的指针 node1 在列表中 因为在 Cpython 中 垃圾收集的主要算法是引用计数 现在我的问题是 示例1 如果我设
  • 如何定义自定义聚合函数来对向量列求和?

    我有一个两列的数据框 ID类型的Int and Vec类型的Vector org apache spark mllib linalg Vector DataFrame 如下所示 ID Vec 1 0 0 5 1 4 0 1 1 1 2 1