Spark - 如何组合/合并 Seq[Row] 中 Dataframe 中的元素以生成 Row

2023-12-12

首先我想说我被迫使用 Spark 1.6

我正在生成一个DataFrame来自这样的 JSON 文件:

{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "28"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}

The DataFrame好像:

+---+----+-------+
|age|  id|   name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 28|1203|  amith|
| 23|1204|  javed|
| 25|1205|  mendy|
| 24|1206|    rob|
| 23|1207| prudvi|
+---+----+-------+

我用这个做什么DataFrame就是按年龄分组,按id排序,过滤所有年龄组中学生人数超过1人的。我使用以下脚本:

import sqlContext.implicits._

val df = sqlContext.read.json("students.json")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val arrLen = udf {a: Seq[Row] => a.length > 1 }

val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age")

val filterd = mergedDF.filter(arrLen(col("newCol")))

现在当前的结果是:

[WrappedArray([28,1203,amith], [28,1202,krishna]),28]
[WrappedArray([25,1201,satish], [25,1205,mendy]),25]
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23]

我现在想要的是将这两个学生行合并到WrappedArray合而为一,例如id第一个学生和name第二个学生的。

为了实现这一点,我编写了以下函数:

def PrintOne(List : Seq[Row], age : String):Row  ={ 
      val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name")) 
      val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema)

      mergedStudent
    }

我知道这个函数可以解决问题,因为当我使用 foreach 测试它时,它会打印出预期值:

filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age"))
                         println("merged student: "+student)
                   }

OutPut:

merged student: [28,1203,krishna]
merged student: [23,1204,prudvi]
merged student: [25,1201,mendy]

但是,当我尝试在地图内执行相同的操作来收集返回值时,问题就开始了。

如果我在没有编码器的情况下运行:

val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}

我得到以下异常:

线程“main”中的异常 java.lang.UnsupportedOperationException:否 找到 org.apache.spark.sql.Row 的编码器 - 字段(类:“org.apache.spark.sql.Row”,名称:“_2”) - 根类:“scala.Tuple2”

当我尝试生成一个Econder就我自己而言,我也失败了:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
    implicit val encoder = RowEncoder(filterd.schema)

    val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder)

类型不匹配;成立 : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] 必需:org.apache.spark.sql.Encoder[(字符串, org.apache.spark.sql.Row)]

我怎样才能提供正确的编码器,或者更好的是避免它?

我被告知要避免使用映射+自定义函数,但我需要应用的逻辑比仅从每一行中选取一个字段更复杂。将多个字段组合起来,检查行的顺序以及值是否为空将更加重要。据我所知,只需使用自定义函数就可以解决它。


的输出map属于类型(String, Row)因此它不能使用编码RowEncoder独自的。您必须提供匹配的元组编码器:

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val encoder = Encoders.tuple(
  Encoders.STRING,
  RowEncoder(
    // The same as df.schema in your case
    StructType(Seq(
      StructField("age", StringType), 
      StructField("id", StringType),
      StructField("name", StringType)))))

filterd.map{row => (
  row.getAs[String]("age"),
  PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))
}(encoder)

总的来说,这种方法看起来像是一种反模式。如果你想使用更实用的风格,你应该避免Dataset[Row]:

case class Person(age: String, id: String, name: String)

filterd.as[(Seq[Person], String)].map { 
  case (people, age)  => (age, (age, people(0).id, people(1).name))
}

or udf.

另请注意o.a.s.sql.catalyst包,包括GenericRowWithSchema,主要供内部使用。除非有必要,否则最好o.a.s.sql.Row.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark - 如何组合/合并 Seq[Row] 中 Dataframe 中的元素以生成 Row 的相关文章

  • Scala:将整个列表的 Either 与每个元素的 Either 组合

    我有一个 Either 列表 它代表错误 type ErrorType List String type FailFast A Either ErrorType A import cats syntax either val l List
  • 如何向数据框spark添加标题和列?

    我有一个数据框 我想在其中添加标题和第一列 手动 这是数据框 import org apache spark sql SparkSession val spark SparkSession builder master local appN
  • Scala 功能设计模式目录

    一周以来我一直在阅读 Scala 编程 作者一步一步地介绍了该语言的元素 但我仍然很困惑何时使用演员 闭包 柯里化等功能性的东西 我正在寻找功能结构的典型用例或最佳实践的目录 我并不是说在 Scala 中重新实现像 GoF 这样的众所周知的
  • 从 pySpark 中的字典构建一行

    我正在尝试在 pySpark 1 6 1 中动态构建一行 然后将其构建到数据帧中 总体思路是扩展结果describe例如 包括偏斜和峰度 这是我认为应该起作用的 from pyspark sql import Row row dict C0
  • Scala 将递归有界类型参数(F 界)转换为类型成员

    我将如何转换 trait Foo A lt Foo A 给类型成员 也就是说 我想要以下内容 trait Foo type A lt Foo type A 但我遇到了困难 因为名称 A 已在类型细化中使用 这个问题是类似的 并衍生自 通过类
  • 在 Databricks / Spark 中的 SQL 中为变量分配动态值

    我觉得我一定在这里遗漏了一些明显的东西 但我似乎无法在 Spark SQL 中动态设置变量值 假设我有两张桌子 tableSrc and tableBuilder 我正在创建tableDest 我一直在尝试变体 SET myVar FLOA
  • 理解 scala 的 _ 与 Any/Nothing

    如果一个类具有协变类型参数 例如Iterable A http www scala lang org archives downloads distrib files nightly docs 2 10 1 library index ht
  • 使用列的长度过滤 DataFrame

    我想过滤一个DataFrame使用与列长度相关的条件 这个问题可能很简单 但我在SO中没有找到任何相关问题 更具体地说 我有一个DataFrame只有一个Column哪一个ArrayType StringType 我想过滤DataFrame
  • scala.math.BigDecimal :1.2 和 1.20 相等

    将 Double 或 String 转换为 scala math BigDecimal 时如何保持精度和尾随零 用例 在 JSON 消息中 属性的类型为 String 值为 1 20 但是在 Scala 中读取这个属性并将其转换为 BigD
  • 将 Apache Zeppelin 连接到 Hive

    我尝试将我的 apache zeppelin 与我的 hive 元存储连接起来 我使用 zeppelin 0 7 3 所以没有 hive 解释器 只有 jdbc 我已将 hive site xml 复制到 zeppelin conf 文件夹
  • PySpark DataFrame 上分组数据的 Pandas 式转换

    如果我们有一个由一列类别和一列值组成的 Pandas 数据框 我们可以通过执行以下操作来删除每个类别中的平均值 df DemeanedValues df groupby Category Values transform lambda g
  • 将当前类作为 scala 中的参数传递

    如何传递当前类作为参数 在java中我们这样做 mymethod this class or mymethod MyClass class 如何将 scala 当前类传递给此方法 this getClass or classOf MyCla
  • 在 Spark 2.4 上的 pyspark.sql.functions.max().over(window) 上使用 .where() 会引发 Java 异常

    我关注了一个帖子堆栈溢出 https stackoverflow com questions 48829993 groupby column and filter rows with maximum value in pyspark 488
  • Scala:“递归值...需要类型”,但我只使用 Java 类型

    object Rec extends App val outStream new java io ByteArrayOutputStream val out new java io PrintStream new java io Buffe
  • xsbt 插件 1.0.0-M7 和 scalatra

    我尝试在我的 scalatra 项目中将 xsbt 插件升级到 1 0 0 M7 但 scalatra 似乎与此版本不兼容 当我尝试重新加载项目时 出现以下错误 我尝试过 scalatra 2 3 0 版本 问候 德斯 java lang
  • 读取不同文件夹深度的多个 csv 文件

    我想递归地将给定文件夹中的所有 csv 文件读入 Spark SQLDataFrame如果可能的话 使用单一路径 我的文件夹结构如下所示 我想包含具有一个路径的所有文件 resources first csv resources subfo
  • 承诺的反面是什么?

    承诺代表将来可能可用 或无法实现 的值 我正在寻找的是一种数据类型 它表示将来可能变得不可用的可用值 可能是由于错误 Promise a b TransitionFromTo
  • Akka中有轻量级的actor吗?

    我的用例非常简单 在两个对象之间交换少量 现在我正在从 Scala Actors 迁移到 Akka 但是我再也找不到那些轻量级 Actors 使用Akka 我不仅需要为Actor创建创建ActorSystem Props 还需要照顾Acto
  • 创建涉及 ArrayType 的 Pyspark 架构

    我正在尝试为我的新 DataFrame 创建一个架构 并尝试了括号和关键字的各种组合 但无法弄清楚如何完成这项工作 我目前的尝试 from pyspark sql types import schema StructType StructF
  • 如何在scala中生成n-gram?

    我正在尝试在 scala 中编写基于 n gram 的分离新闻算法 如何为大文件生成 n gram 例如 对于包含 蜜蜂是蜜蜂中的蜜蜂 的文件 首先它必须选择一个随机的 n 元语法 例如 蜜蜂 然后它必须寻找以 n 1 个单词开头的 n 元

随机推荐