如何从结构元素的嵌套数组创建 Spark DataFrame?

2023-11-27

我已将 JSON 文件读入 Spark。该文件具有以下结构:

scala> tweetBlob.printSchema
root
 |-- related: struct (nullable = true)
 |    |-- next: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |-- search: struct (nullable = true)
 |    |-- current: long (nullable = true)
 |    |-- results: long (nullable = true)
 |-- tweets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cde: struct (nullable = true)
...
...
 |    |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |    |-- message: struct (nullable = true)
...
...

我理想地想要的是一个带有“cde”、“cdeInternal”、“message”列的 DataFrame...如下所示

root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...

我设法使用“explode”将“tweets”数组中的元素提取到名为“tweets”的列中

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
 |-- tweets: struct (nullable = true)
 |    |-- cde: struct (nullable = true)
...
...
 |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |-- message: struct (nullable = true)
...
...

如何选择结构内的所有列并从中创建一个 DataFrame?如果我的理解是正确的,爆炸不适用于结构。

任何帮助表示赞赏。


处理此问题的一种可能方法是从模式中提取所需的信息。让我们从一些虚拟数据开始:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._


case class Bar(x: Int, y: String)
case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF

df.printSchema

// root
//  |-- bar: struct (nullable = true)
//  |    |-- x: integer (nullable = false)
//  |    |-- y: string (nullable = true)

和一个辅助函数:

def children(colname: String, df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}"))
}

最终结果:

df.select(children("bar", df): _*).printSchema

// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从结构元素的嵌套数组创建 Spark DataFrame? 的相关文章

  • 从数据框中按索引删除行

    我有一个数组wrong indexes train其中包含我想从数据框中删除的索引列表 0 63 151 469 1008 要删除这些索引 我正在尝试这样做 df train drop wrong indexes train 但是 代码失败
  • 如何解决使用 Spark 从 S3 重新分区大量数据时从内存中逐出缓存的表分区元数据的问题?

    在尝试从 S3 重新分区数据帧时 我收到一个一般错误 Caused by org apache spark SparkException Job aborted due to stage failure Task 33 in stage 1
  • 使用已知模式保存空 DataFrame (Spark 2.2.1)

    是否可以使用已知模式保存一个空的 DataFrame 以便将该模式写入文件 即使它有 0 条记录 def example spark SparkSession path String schema StructType val datafr
  • 用缺失的日期填充其他列 Nan Pandas DataFrame

    我实际上是从几个 Excel 文件中提取数据来监控我的每日卡路里摄入量 我设法使用列表理解来生成日期 我尝试使用合并或连接 但它不起作用 ValueError 您正在尝试合并对象和 float64 列 date list 2021 05 2
  • Java时间转正常格式

    我有 Java 时间1380822000000 我想转换为我可以阅读的内容 import java util Date object Ws1 val a new Date 1380822000000 toString 导致异常 warnin
  • 如何使用 zio-test 测试异常情况

    我有以下功能 我想测试 def people id Int RIO R People 如果有 People 则此函数返回 Peopleid 分别 如果没有则失败 例如 IO fail ServiceException s No People
  • 聚合函数在数据框中创建不需要的向量

    我在函数中创建数据帧时遇到了一个奇怪的问题 但是 在 data frame 之外使用相同的方法效果很好 这是基本函数 我用它来计算数据集的平均值 标准差和标准误差 aggregateX lt function formula dataset
  • 缩放数据框的每一列

    我正在尝试缩放数据框的每一列 首先 我将每一列转换为向量 然后使用 ml MinMax Scaler 除了简单地重复它之外 是否有更好 更优雅的方法将相同的函数应用于每一列 import org apache spark ml linalg
  • 动态过滤 pandas 数据框

    我正在尝试使用三列的阈值来过滤 pandas 数据框 import pandas as pd df pd DataFrame A 6 2 10 5 3 B 2 5 3 2 6 C 5 2 1 8 2 df df loc df A gt 0
  • DataFrame 分区到单个 Parquet 文件(每个分区)

    我想重新分区 合并我的数据 以便将其保存到每个分区的一个 Parquet 文件中 我还想使用 Spark SQL partitionBy API 所以我可以这样做 df coalesce 1 write partitionBy entity
  • 将多个 Future[Seq] 连接成一个 Future[Seq]

    如果没有 Future 这就是我将所有较小的 Seq 组合成一个大 Seq 的方式flatmap category getCategoryUrlKey id Int Seq Meta main method val appDomains S
  • 如何从 data.frame 中删除列?

    不是 你怎么 但更多的是 你怎么 如果有人给你一个包含 200 列的文件 并且你想将其减少到分析所需的少数列 你会如何做呢 一种解决方案是否比另一种解决方案更有优势 假设我们有一个包含列 col1 col2 到 col200 的数据框 如果
  • 如何删除spark输出中的compactbuffer

    下面是我在spark shell中运行的程序 但是当我将输出保存在HDFS中时 我得到带有compactbuffer的输出 如何删除spark输出中的compactbuffer Program val a sc textFile datag
  • 按工作日对 pandas 数据框进行排序

    如何按工作日名称对 DataFrame 进行排序 我无法使用 pd to datetime 方法 因为我的日期不是数字 Date Transactions 0 Friday 140 652174 1 Monday 114 000000 2
  • 无法在 NetBeans 7.4rc1 上安装 nb-scala

    我已经安装了 NB 7 4rc1 并从下载了 nb scalahttp sourceforge net projects erlybird files nb scala http sourceforge net projects erlyb
  • 确定列的累积最大值

    我正在尝试以下代码 df pd DataFrame 23 52 36 49 52 61 75 82 97 12 columns A B df C np where df A gt df C shift df A df C shift pri
  • Dataframe unstack 性能 - pandas

    我正在尝试拆开数据框 它工作正常 但问题是我正在处理 CSV 文件中的巨大数据集 约 10 亿 这是示例数据集 236539 48512569874 Name Danny 236539 48512569874 Class 12 236539
  • 在 Jupyter 笔记本中使用 PySpark 读取 XML

    我正在尝试读取 XML 文件 df spark read format com databricks spark xml load path to my xml 并收到以下错误 java lang ClassNotFoundExceptio
  • 避免函数内装箱/拆箱

    对于数字密集型代码 我编写了一个具有以下签名的函数 def update f Int Int Double gt Double Unit 然而 因为Function3不是专门的 每个应用程序f结果对 3 个参数和结果类型进行装箱 拆箱 我可
  • 无法证明与路径相关类型的等价性

    为什么最后一个summon编译失败 我该怎么做才能让它编译 import java time LocalDateTime LocalTime trait Circular T type Parent given localTimeCircu

随机推荐

  • 自动装箱和拆箱在 Java 和 C# 中的行为是否不同

    我正在手动将代码从 Java 1 6 转换为 C 并发现基元 int 和 double 的行为存在一些困难 在 C 中 似乎几乎所有转换都会自动发生 List
  • 两个日期相减得到时间增量

    我正在尝试从我的数据库值之一获取一个值 该值将通过从今天的日期减去购买日期来给出 我这样写了我的代码 delta datetime now item purchase date 但这给了我这个错误 unsupported operand t
  • 使用 dask.delayed 和 pandas.DataFrame 将字典的 dask.bag 转换为 dask.dataframe

    我正在努力转换dask bag的字典到dask delayed pandas DataFrames进入决赛dask dataframe 我有一个函数 make dict 将文件读入相当复杂的嵌套字典结构 另一个函数 make df 将这些字
  • 第二次迭代文件不起作用[重复]

    这个问题在这里已经有答案了 我在迭代文件时遇到问题 这是我在解释器上输入的内容和结果 gt gt gt f open baby1990 html rU gt gt gt for line in f readlines print line
  • 如何在 Rust 中连接静态切片?

    我有两个静态切片u8我会实现一个函数来连接它们 类似的东西 fn concat u8 first static u8 second static u8 gt static u8 first second concat 编译器向我显示错误re
  • /usr/bin/rename:参数列表太长(批量重命名文件)

    我试图通过截断文件名中出现第一个空格的文件来批量重命名某些文件 我编写了一个简单的脚本来通过重命名来执行此操作 for i in fa do rename s s fa done 这在测试中效果很好 并根据需要产生以下结果 testenv
  • 如何在 VBA 中使用 FileSystemObject?

    有什么我需要参考的吗 我该如何使用这个 Dim fso As New FileSystemObject Dim fld As Folder Dim ts As TextStream 我收到错误 因为它无法识别这些对象 在 Excel 中 您
  • 无法实例化邮件功能。为什么会出现这个错误

    当我尝试通过 PHPMailer 发送邮件时 收到此错误消息 我的代码如下
  • MySQLi 中的 SELECT * FROM

    我的网站相当广泛 而且我最近刚刚切换到 PHP5 请称我为大器晚成者 我之前的所有 MySQL 查询都是这样构建的 SELECT FROM tablename WHERE field1 value field2 value2 这使得一切变得
  • 在两个圆之间画一个箭头?

    如何在两个圆之间绘制箭头线 给定 圆心的位置 圆的半径 我在用line and markersvg 对象 If I draw the arrows to the center of the circle then the arrow is
  • 为什么这个多重绑定不起作用

    我从我的复选框命令发送了多个参数 我用过转换器 代码如下 如果我放置一个调试器并看到这里的值就是我的结果 当复选框检查被选中或取消选中时 在转换器中 它具有值 项目对象和布尔值的数组 但是当我使用我的方法时 该值是一个对象 2 但两个值都是
  • 为单元测试创​​建对象 MockHttpServletResponse 时出错

    我试图使用 sprint test 为 Servlet 编写单元测试 模拟对象 我的 Maven 依赖项是
  • 将 double 转换为小数点后特定数字的科学记数法

    我想将双精度转换为科学计数法 如下所示 0 00752382528 gt 752383E 1 我可以用 ToString 或 Regex 来做到这一点吗 您可以使用标准格式字符串对于科学计数法 0 00752382528 ToString
  • SSL 证书链不同;如何验证?

    简洁版本 我看到 SSL 证书链根据我访问 https 服务器的方式而有所不同 这是怎么回事 在这种情况下我该如何验证证书 稍微长一点的版本 我正在尝试使用 libcurl 来验证 SSL 连接的证书 我连接到的服务器是 Amazon S3
  • 如何在Python中获取文件关闭事件

    在 windows 7 64 位机器上使用 python 2 7 如何获取文件关闭事件 当文件在文件打开器的新进程中打开时 例如记事本 写字板 每次在写字板的新进程中打开文件 当文件在文件打开器的选项卡中打开时 例如notepad 它在新选
  • 无法在 Heroku 上进行 pg_restore:“无法从输入文件读取:文件结尾”

    我正在尝试将本地 PostgreSQL 数据库复制到 Heroku 应用程序pg dump pg restore实用程序 按照 Heroku 的官方指南进行 https devcenter heroku com articles herok
  • 递归引用数据框

    有没有办法让数据框引用自身 我发现自己花了很多时间写类似的东西y Category1 is na y Category1 lt NULL这些内容很难阅读 而且感觉就像是大量缓慢的重复打字 我想知道是否有类似的内容 y Category1 i
  • SQL 嵌套替换

    我有一个复杂的嵌套替换 我用它来连接 MSSQL 中的两个表 select from A left outer join select from B on replace replace replace replace replace A
  • Numpy 源代码中的常量是在哪里定义的?

    我试图找到常量在 numpy 中存储的位置 一些方向会很好 里面定义了很多常量 numpy core include numpy npy math h 当前定义的是 从第 48 行开始 define NPY INFINITYF npy in
  • 如何从结构元素的嵌套数组创建 Spark DataFrame?

    我已将 JSON 文件读入 Spark 该文件具有以下结构 scala gt tweetBlob printSchema root related struct nullable true next struct nullable true