Spark 2.0 DataSets groupByKey 和 除法操作以及类型安全

2024-01-16

我对 Spark 2.0 DataSets 非常满意,因为它的编译时类型安全。但这里有几个我无法解决的问题,我也没有找到很好的文档。

问题#1 - 对聚合列进行除法运算 -考虑下面的代码 - 我有一个 DataSet[MyCaseClass],我想对 c1、c2、c3 和 sum(c4) / 8 进行 groupByKey。如果我只计算总和,下面的代码效果很好,但它给出了 diverge(8) 的编译时错误。我想知道我怎样才能实现以下目标。

final case class MyClass (c1: String,
                          c2: String,
                          c3: String,
                          c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

     myCaseClass.
       groupByKey(myCaseClass =>
          (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
          agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
          divide(8)). //this is breaking with exception
       show()

如果我删除 .divide(8) 操作并运行上面的命令,它会给出以下输出。

+-----------+-------------+
|        key|sum(c4)      |
+-----------+-------------+
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  
+-----------+-------------+

问题 #2 - 将 groupedByKey 结果转换为另一个类型化 DataFrame -现在我的问题的第二部分是我想再次输出一个类型化的数据集。为此,我有另一个案例类(不确定是否需要),但我不确定如何映射分组结果 -

final case class AnotherClass(c1: String,
                          c2: String,
                          c3: String,
                          average: Double) 

 myCaseClass.
           groupByKey(myCaseClass =>
              (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
              agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception

但这再次失败,并出现异常,因为按键结果分组未直接与 AnotherClass 映射。

PS:非常欢迎任何其他实现上述目标的解决方案。


第一个问题可以通过一直使用类型化列来解决(KeyValueGroupedDataset.agg期望TypedColumn(-s)) 您可以将聚合结果定义为:

val eight = lit(8.0)
  .as[Double]  // Not necessary

val sumByEight = typedSum[MyClass](_.c4)
  .divide(eight)
  .as[Double]  // Required
  .name("div(sum(c4), 8)")

并将其插入以下代码:

val myCaseClass = Seq(
  MyClass("a", "b", "c", 2.0),
  MyClass("a", "b", "c", 3.0)
).toDS

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .agg(sumByEight)

to get

+-------+---------------+
|    key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]|          0.625|
+-------+---------------+

第二个问题是由于使用了不符合数据形状的类而导致的。正确的表示可能是:

case class AnotherClass(key: (String, String, String), sum: Double)

与上面定义的数据一起使用:

 myCaseClass
   .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
   .agg(typedSum[MyClass](_.c4).name("sum"))
   .as[AnotherClass]

会给出:

+-------+---+
|    key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+

but .as[AnotherClass]如果这里没有必要Dataset[((String, String, String), Double)]是可以接受的。

你当然可以跳过所有这些,然后mapGroups(尽管并非没有性能损失):

import shapeless.syntax.std.tuple._   // A little bit of shapeless

val tuples = myCaseClass
 .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
 .mapGroups((group, iter) => group :+ iter.map(_.c4).sum)

有结果

+---+---+---+---+   
| _1| _2| _3| _4|
+---+---+---+---+
|  a|  b|  c|5.0|
+---+---+---+---+

reduceGroups可能是一个更好的选择:

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))

由此产生的Dataset:

+-------+-----------+    
|     _1|         _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 2.0 DataSets groupByKey 和 除法操作以及类型安全 的相关文章

随机推荐

  • R - 将向上对角线转换为行

    我得到一个矩阵 数据框或数据表 我想创建一个矩阵 其中向上 反向对角线作为行 其余单元格作为 NA 我能够做到这一点 但我认为 应该有一个更容易 更简单的解决方案 因此 任何解决方案都值得赞赏 作为一个例子 假设我得到以下 data tab
  • Grails 服务类交叉引用

    我意识到 Grails 服务类是 Spring 管理的单例 我还知道 您可以通过像这样的驼峰式大小写形式使用 serviceClassName 声明本地定义来引用另一个服务类 令我惊讶的是 我似乎无法像这样交叉引用服务类 class Fir
  • jsTree:如何从jstree中获取所有叶节点?

    我想从 jsTree 获取所有叶节点 节点的 ID 和文本 我没有使用复选框 ui jsTree Root A A1 A1 1 A2 A2 1 B B2 C C1 C1 1 我想要 jsTree 的叶节点列表 预期输出 A1 1 A2 1
  • Google Drive API 如何支持下载请求的内容范围? [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 Google Drive API 如何支持获取内容范围请求 App Engine 获取请求大小有限制 请支持范围请求 以便可以读取大文件 您可以提供
  • __unicode__() 不返回字符串

    我在 python 中有以下课程 class myTest def init self str self str str def unicode self return self str 并在其他一些文件中实例化 myTest 来尝试 un
  • Excel 单元格中的 OpenXml 和日期格式

    我正在尝试使用 OpenXML 创建 xlsx 格式的 Excel 文件 因为我需要在 Web 服务器上使用它 我在表格中填写数值没有任何问题 但是我正在努力在单元格中设置经典的日期格式 下面使用快速测试DocumentFormat Ope
  • 类型错误:预期序列或类似数组,得到估计器

    我正在开发一个对产品有用户评论的项目 除了我手动提取的一些其他特征之外 我正在使用 TfidfVectorizer 从数据集中提取特征 df pd read csv reviews csv header 0 FEATURES feature
  • 根据屏幕尺寸缩放字体大小?

    我正在使用 AngularJS 1 x 和 Angular Material 开发一个 Web 应用程序 我正在尝试创建一个带有大标题的登陆页面md display 4 它看起来是这样的 正如您所看到的 标题很好地填充了页面并且适合一行 然
  • 如何在 iOS 地图上叠加一个圆圈

    我有一个半径和一个位置 这就是我试图获取圆的边界矩形的方法 MKMapRect boundingMapRect CLLocationCoordinate2D tmp MKCoordinateSpan radiusSpan MKCoordin
  • Laravel 5.4 - 如何对同一自定义验证规则使用多个错误消息

    为了重用代码 我在名为的文件中创建了自己的验证器规则验证服务提供者 class ValidatorServiceProvider extends ServiceProvider public function boot Validator
  • SPRING:在 Spring 中由工厂实例化创建的 bean 中使用自动装配

    您好 我无法在另一个使用工厂方法实例化的 bean 中自动装配我的 bean class A private String name getters and setters for name class B Autowired privat
  • Lightgbm 提前停止无法正常工作

    我正在使用 lightgbm 来执行机器学习任务 我想使用早期停止来找到给定多个超参数的最佳树数 然而 lgbm 停止种植树木 同时仍在改进我的评估指标 下面我附上了我的规格 params max bin 128 num leaves 8
  • 在 AWS SES 上实施把手助手,以便在 HTML 电子邮件程序中条件性地呈现标记

    我们正在创建一个 HTML 电子邮件模板 该模板需要根据 JSON 数据中的属性之一的值有条件地呈现标记 我们发现在以下示例中 您可以创建自定义 Handlebar Helpers 来完成此操作 但在使用这些助手时 AWS SES 拒绝发送
  • 将 MATLAB 文件转换为 Octave

    我有一系列为 MATLAB 编写的实验 但最近我们尝试通过 Octave 来运行它们 我意识到它们大多是兼容的 但我遇到了一些问题 而且我发现的在线常见问题解答或说明都没有解决这些问题 这有点复杂 因为有多个 m 文件相互作用 不过 现在我
  • 在 asp.net 中上传文件之前如何检查文件类型?

    我们如何在不使用文件扩展名的情况下检查文件类型 例如jpg等格式 上传它们使用 asp net 和 c 我正在使用 vs 2008 asp net c TELERIK 控件 RadUpload 想象一下有人将文本文件扩展名更改为 jpg 并
  • Haskell 中类型表达式的 Lambda?

    Haskell 或特定的编译器是否有类似类型级 lambda 的东西 如果这甚至是一个术语 详细说明一下 假设我有一个参数化类型Foo a b并想要Foo b成为 Functor 的一个实例 有没有什么机制可以让我做类似的事情 instan
  • 如何在安装了 goclipse 的 eclipse 中运行 GO 项目

    我已经在 eclipse 中安装了 goclipse 并创建了一个新的 go 项目 现在这就是我所拥有的 我的 hello go 看起来像这样 package main import fmt func main fmt Println He
  • 何时在 Makefile 中使用空格或制表符?

    我正在创建一个使用条件 if 和 ifneq 的 makefile 我注意到 如果我使用 if 下一行应该用空格缩进 if d d then
  • 如何在 gdb 中使用带有 FS 或 GS​​ 基址的逻辑地址?

    gdb 提供了读取或写入特定的功能线性地址 例如 gdb x 1wx 0x080483e4 0x80483e4
  • Spark 2.0 DataSets groupByKey 和 除法操作以及类型安全

    我对 Spark 2 0 DataSets 非常满意 因为它的编译时类型安全 但这里有几个我无法解决的问题 我也没有找到很好的文档 问题 1 对聚合列进行除法运算 考虑下面的代码 我有一个 DataSet MyCaseClass 我想对 c