Spark SQL - 从 sql 函数生成数组的数组

2024-03-26

我想创建一个数组的数组。这是我的数据表:

// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)

// Create an RDD with some data
val x = sc.parallelize(Array(
    Testing(null, 21, 905),
    Testing("Noelia", 26, 1130),
    Testing("Pilar", 52,  1890),
    Testing("Roberto", 31, 1450)
 ))

// Convert RDD to a DataFrame 
val df = sqlContext.createDataFrame(x) 

// For SQL usage we need to register the table
df.registerTempTable("df")

我想创建一个整数列“age”的数组。为此,我使用“collect_list”:

sqlContext.sql("SELECT collect_list(age) as age from df").show

但现在我想生成一个包含上面创建的多个数组的数组:

 sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show

但这不行,还是用org.apache.spark.sql.functions.array这个函数。有任何想法吗?


好吧,事情再简单不过了。让我们考虑一下您正在处理的相同数据,并从那里逐步进行

// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)

// Create an RDD with some data
val x = sc.parallelize(Array(
  Testing(null, 21, 905),
  Testing("Noelia", 26, 1130),
  Testing("Pilar", 52, 1890),
  Testing("Roberto", 31, 1450)
))

// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)

// For SQL usage we need to register the table
df.registerTempTable("df")
sqlContext.sql("select collect_list(age) as age from df").show

// +----------------+
// |             age|
// +----------------+
// |[21, 26, 52, 31]|
// +----------------+

sqlContext.sql("select collect_list(collect_list(age),     collect_list(salary)) as arrayInt from df").show

正如错误消息所示:

org.apache.spark.sql.AnalysisException: No handler for Hive udf class
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...]

collest_list仅需要一个参数。让我们检查一下文档here http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24.

它实际上需要一个参数!但让我们进一步了解函数对象的文档。您似乎已经注意到,数组函数允许您从 Column 或重复的 Column 参数创建新的数组列。那么让我们使用它:

sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false)

数组函数确实从由collect_list预先创建的列列表中创建了一个列,包括年龄和薪水:

// +-------------------------------------------------------------------+
// |arrayInt                                                           |
// +-------------------------------------------------------------------+
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]|
// +-------------------------------------------------------------------+

我们接下来该去哪里?

您必须记住,DataFrame 中的 Row 只是由 Row 包装的另一个集合。

我要做的第一件事就是处理该系列。那么我们如何展平WrappedArray[WrappedArray[Int]] ?

Scala 有点神奇,你只需要使用.flatten

import scala.collection.mutable.WrappedArray

val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] =
  sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df")
    .first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]]
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] =
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450))

firstRow.flatten
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450)

现在让我们将其包装在 UDF 中,以便我们可以在 DataFrame 上使用它:

def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]]))

由于我们注册了 UDF,我们现在可以在 sqlContext 中使用它:

sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false)

// +---------------------------------------+
// |arrayInt                               |
// +---------------------------------------+
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]|
// +---------------------------------------+

我希望这有帮助 !

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark SQL - 从 sql 函数生成数组的数组 的相关文章

  • ';'预期但发现“导入” - Scala 和 Spark

    我正在尝试使用 Spark 和 Scala 来编译一个独立的应用程序 我不知道为什么会收到此错误 topicModel scala 2 expected but import found error import org apache sp
  • 如何从字符串列中提取数字?

    我的要求是从列中的评论列中检索订单号comment并且总是开始于R 订单号应作为新列添加到表中 输入数据 code id mode location status comment AS SD 101 Airways hyderabad D
  • Haskell scala 互操作性

    我是 Scala 初学者 来自面向对象范式 在了解 Scala 的函数式编程部分时 我被引导到 Haskell 纯函数式编程语言 探索 SO 问题答案 我发现 Java Haskell 具有互操作性 我很想知道 Scala Haskell
  • 如何以最佳方式传递元组参数?

    如何以最佳方式传递元组参数 Example def foo Int Int def bar a Int b Int 现在我想传递的输出foo to bar 这可以通过以下方式实现 val fooResult foo bar fooResul
  • 以编程方式启动 Scala REPL?

    我想从命令行启动 Scala Swing 应用程序 然后在应用程序启动后 放入 Scala REPL 中以用作控制界面 理想情况下 我还想预先绑定一些变量名称 更好的是使用 REPL 的 Java2D 终端模拟器 但我找不到任何合适的东西
  • 在 Akka/Scala 中使用带有 future 的 mapTo

    我最近开始使用 Akka Scala 编码 遇到了以下问题 通过范围内的隐式转换 例如 implicit def convertTypeAtoTypeX a TypeA TypeX TypeX just some kinda convers
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 如何使用 PySpark 预处理图像?

    我有一个项目 需要为 1 设置大数据架构 AWS S3 SageMaker 的概念验证使用 PySpark 预处理图像 2 执行 PCA and 3 训练一些机器或深度学习模型 我的问题是了解如何使用 PySpark 操作图像数据 但无法在
  • 如何从命令行向 REPL 添加导入?

    如何使 REPL 导入命令行中给出的包 Sample scala someMagicHere import sys error scala gt imports 1 import scala Predef 162 terms 78 are
  • 具有两个通用参数的上下文边界

    在 Scala 中 我可以使用上下文边界 def sort T Ordered t Seq T 与以下意思相同 def sort T t Seq T implicit def Ordered T 如果我有一个带有两个泛型参数的类怎么办 IE
  • Akka Stream Graph 恢复问题

    我创建了一个图表来并行化具有相同输入的两个流 这些流产生 Future Option Entity 如果 flowA 失败 我想返回 Future None 但恢复似乎没有被调用 val graph Flow Input Future Op
  • Scala 的代码覆盖率工具 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • Spark问题中读取大文件 - python

    我已经使用 python 在本地安装了 Spark 并在运行以下代码时 data sc textFile C Users xxxx Desktop train csv data first 我收到以下错误 Py4JJavaError Tra
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • pyspark flatmat 错误:TypeError:“int”对象不可迭代

    这是我书中的示例代码 from pyspark import SparkConf SparkContext conf SparkConf setMaster spark chetan ThinkPad E470 7077 setAppNam
  • Scala:什么是 CompactBuffer?

    我试图弄清楚 CompactBuffer 的含义 和迭代器一样吗 请解释其中的差异 根据 Spark 的文档 它是 ArrayBuffer 的替代方案 可以提供更好的性能 因为它分配的内存更少 以下是 CompactBuffer 类文档的摘
  • andThen 类型不匹配的 Scala 链接函数

    我有一堆函数可以清理文本并将它们分成单词 最小的例子 val txt Mary had a little nlamb val stopwords Seq a def clean text String String text replace
  • Slick和bonecp:org.postgresql.util.PSQLException:FATAL:抱歉,太多客户端已经错误

    当我在本地开发应用程序时 我使用以下命令启动我的 play2 应用程序sbt run 我喜欢如何更改代码 然后重新加载浏览器以查看我的更改 在大约 10 次代码更改之后 我收到 postgresql 太多连接错误 见下文 我的数据库连接使用
  • 解决“Show”类型类实例的隐式问题

    我正在努力使Gender实施Show类型类 scala gt trait Gender extends Show Gender defined trait Gender scala gt case object Male extends G

随机推荐

  • mongodb show dbs list数据库失败

    我是 mongodb 的新手 我刚刚在我的MAC上安装了mongoDB 看完这个 YouTube 视频后 在 mongo shell 中 我输入 show dbs 并得到有线输出 请帮助我理解并解决这个问题 gt show dbs 2017
  • 在 JDBC 中插入单引号以进行 SQL 查询不起作用

    我在通过 Oracle JDBC 在 JAVA 的准备好的语句中使用单引号时遇到了处理单引号的问题 假设我们有一张 Restaurant 表 其中一列 Restaurant name 的值为 1 Jack s Deli 我想使用一个简单的准
  • 使用另一个对话框的功能更改 jQuery-UI 对话框的标题

    为什么第二个 jQuery UI 对话框标题在弹出时不改变 第一个对话框我使用以下命令更改框的标题 attr title Confirm 它将第一个框的标题更改为 确认 就像它应该有的那样 现在 当第二个框弹出时 它应该将标题更改为 消息
  • 将 std::hash 专门化为依赖类型

    我已经定义了这个模板类结构 template
  • 尝试获取已安装应用程序列表时出现 TransactionTooLargeException

    作为我的应用程序的一部分 我通过使用 ApplicationPackageManager getInstalledApplications 获取设备上安装的应用程序列表 但对于某些用户 我收到崩溃报告说 android osBinderPr
  • 视图漂浮在所有 ViewController 之上

    在 iOS 上 视图是否可能始终漂浮在所有其他视图之上 我问这个是因为我想要实现的是一个漂浮在 ViewController 之上的视图 然后一个模态视图控制器滑入 同时该特定视图仍然漂浮在该模态视图控制器上 希望你明白我想说的 有 您可以
  • 程序员多久会被要求编写一个 makefile 文件? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 从当前文化中获取货币?

    有没有办法从应用程序文化设置动态获取当前信息 基本上 如果用户将文化设置为美国 我想知道货币是美元 或者如果他们将其设置为英国 我想知道英镑等 等等 这样我就可以在付款时将此信息发送给 PayPal 使用 RegionInfo ISOCur
  • 根据当前值更新 MongoDB 中的值

    我想做这样的事情 但是this关键字似乎没有在更新语句中设置 db items update foo set bar this foo false true 我必须使用eval来完成这个 是的 您不能引用修饰符中的其他字段 您必须使用 db
  • 使 MongoDB 中的表字段可文本搜索

    先决条件 已使用集合创建数据库posts它的架构如下 module exports function mongoose var Schema mongoose Schema var postSchema new Schema postID
  • Java TA-Lib 文档 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找有关的文档TA Lib http www ta lib org index html在爪哇
  • 使用 SWIG 和 Python/C API 包装返回 std::map 的函数

    我想包装一个 C 例程 它返回一个std map整数和指向 C 类实例的指针 我在使用 SWIG 时遇到困难 希望能提供任何帮助 我试图通过一个简单的例子来将这个问题归结为它的本质 标题test h定义如下 File test h incl
  • 计算单元测试运行期间发生的GC数量[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我目前正在编写一个单元测试来查看给定方法的性能影响 从实践中我们观察到 当前在给定方法的执行过程中发生了很多GC 我想知道是否可以查看从
  • ASIHTTPRequest dealloc 和 EXC_BAD_ACCESS 问题

    我使用一组 ASIHTTPRequest 包装器 AsyncImageLoader 来下载 UITableView 中单元格的图像 我在处理 ASIHTTPRequests 生命周期时遇到问题 如果我释放它们 如果我在它们尝试加载图像时继续
  • 警告:require_once():http:// 包装器在服务器配置中被allow_url_include=0 禁用

    我试图通过以下方式在页面中包含 php 文件 require once http localhost web a php 我收到错误 Warning require once http wrapper is disabled in the
  • Jodatime的LocalDateTime第一次使用时很慢

    我目前正在一个 java 项目中测试一些 webapp 技术 并且想知道为什么页面有时加载速度很快 有时需要近 5 秒才能加载 我终于发现是这条线 LocalDateTime now new LocalDateTime 第一次调用时 需要很
  • 使用 par 时图例框宽度不正确

    我有问题 我的图例太大 我的代码 par mfrow c 1 2 hist alvsloss breaks 100 freq F main Histogramm density curve gaussian kernel n and fit
  • Dart - 试图理解“工厂”构造函数的价值

    如果我理解正确的话 A factory constructor affords an abstract class to be instantiated by another class despite being abstract 例如
  • 仅调用一个 Paint 事件

    我的问题是我有 8 个图片框 但一次只有其中一个调用其绘制方法 我的代码有点太大 所以我尝试尽可能地将其范围缩小到受影响的部分 我最好的猜测是 这并不是我的代码中的错误 而是对绘制事件如何工作的误解 我有一个继承自 PictureBox 的
  • Spark SQL - 从 sql 函数生成数组的数组

    我想创建一个数组的数组 这是我的数据表 A case class for our sample table case class Testing name String age Int salary Int Create an RDD wi