展平 Scala Spark Dataframe 中的嵌套 json

2024-03-01

我有多个来自任何restapi 的json,但我不知道它的架构。我无法使用 dataframes 的爆炸功能,因为我不知道由 Spark api 创建的列名称。

1.我们可以通过解码值来存储嵌套数组元素的键吗dataframe.schema.fields,由于spark仅提供数据帧行中的值部分,并以顶级键作为列名。

数据框——

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

是否有任何最佳方法可以通过在运行时确定架构来使用数据帧方法来压平 json。

示例 Json -:

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

注意 - 我们需要在 dataframe 中执行所有操作,因为有大量数据即将到来,我们无法解析每个 json。


尽量避免展平所有列。

创建辅助函数&您可以直接调用df.explodeColumns在数据帧上。

下面的代码将展平多层数组和结构类型列。

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers

扁平柱

scala> df.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)


scala> df.explodeColumns.printSchema
root
 |-- author: string (nullable = true)
 |-- frameworks_id: long (nullable = true)
 |-- frameworks_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

scala>

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

展平 Scala Spark Dataframe 中的嵌套 json 的相关文章

随机推荐

  • 通过管道连接 popen stderr 和 stdout

    我想通过 python 从目录调用脚本 它们是可执行的 shell 脚本 到目前为止 一切都很好 for script in sorted os listdir initdir reverse reverse if script endsw
  • python -m SimpleHTTPServer - 监听 0.0.0.0:8000 但 http://0.0.0.0:8000/test.html 给出“找不到页面”

    cd 到我的文件夹后 我输入 python m SimpleHTTPServer and get Serving HTTP on 0 0 0 0 port 8000 回复 但当我击中http 0 0 0 0 8000 test html我收
  • JPA/Hibernate 有条件一对多关系?

    我正在使用 Hibernate Tools 直接从数据库生成 DAO 和类 数据库中有两张表 表A和B A到B之间存在一对多关系 B中的多行映射到单个A 在A A类 的生成代码中 有B类的集合 体现了一对多的关系 但是 我不需要 B 中属于
  • 从 ejs 模板生成静态 HTML 文件

    好的 我已经启动并运行了我的基本 Nodejs 网站 这一切都正常工作并通过节点服务器运行 使用我的基本节点站点 https github com andrewbrandwood node basic site 它使用 ejs 作为模板引擎
  • Java 中内存分配的典型速度是多少?

    我正在分析一个 Java 应用程序 发现对象分配的速度比我预期的要慢得多 我运行了一个简单的基准测试来尝试确定小对象分配的整体速度 我发现在我的机器上分配一个小对象 3 个浮点数的向量 似乎需要大约 200 纳秒 我在 双核 2 0 GHz
  • Json.NET:反序列化嵌套字典

    将对象反序列化为Dictionary JsonConvert DeserializeObject
  • openerp中的父左和父右

    什么是父左和父右 它是如何运作的Openerp Thanks 正如拉斐尔 科莱 Raphael Collet 所解释的那样他关于 OpenERP Server 的回答 https answers launchpad net openobje
  • 如何使用 WebStorm 进行 Chrome 扩展开发?

    我刚刚购买了 WebStorm 5 到目前为止一直非常喜欢它的检查功能 我在开发 Chrome 扩展程序时遇到的一个问题是它无法识别chrome多变的 有什么办法可以添加chrome变量到检查器以便它可以在我键入时自动完成 我猜我需要添加
  • SurfaceTexture updateTexImage 共享 2 个 EGLContext - Android 4.4 上的问题

    我指的是这个关于如何将相机的预览帧直接编码到 mp4 文件中的优秀示例 http bigflake com mediacodec CameraToMpegTest java txt http bigflake com mediacodec
  • 这不知何故冻结了我的整个程序,我不确定为什么?

    所以我在线程上运行所有内容run and not run按预期工作 但是running不打印 我尝试调用它 status text Working print run process 但这只会冻结我的整个程序 我也尝试放入root afte
  • 如何在SAPUI5中自定义Shell容器[重复]

    这个问题在这里已经有答案了 我有一个外壳容器 在大屏幕上我想充分利用屏幕 我想覆盖整个区域 我如何定制它 我假设您正在使用 XML 来表达您的观点 添加以下属性appWidthLimited false 到 Shell 标签
  • 如何从我的位置在 Google Maps API V2 中绘制路线 [重复]

    这个问题在这里已经有答案了 我想进行方向应用 但是 我在绘制从我的位置到目的地的路线时遇到问题 我从我的位置获取变量经度和纬度 但我不知道画线 我想绘制到该位置的方向 6 984873352070259 108 48140716552734
  • 从 MediaStream 对象获取媒体详细信息(分辨率和帧速率)

    我正在捕获用户的相机 我想以尽可能最佳的分辨率捕获图片 所以我的代码类似于下面的代码片段 我想从传入流中读取分辨率详细信息 因此我可以将其设置为视频高度和宽度 我将用它来单击快照 我希望快照具有流提供的最佳质量 这可能吗 读取分辨率详细信息
  • “粗箭头”(=>)何时绑定到“this”实例

    粗箭头可以在不同的设置中使用 但不知何故却不能 始终绑定到我想要的实例 粗箭头绑定3次 声明方法时 在方法内声明函数时 在全局上下文中声明函数时 1 声明方法时 当 Coffeescript 编译器遇到以下语法模式时 在类声明中 class
  • 在单个文件中重新启动/撤消冲突解决方案

    在具有多个冲突文件的较大 git 合并中 我错误地将文件标记为已解决 使用git add FILE经过一些编辑 现在我想撤消冲突解决尝试并重新开始解决该文件 我怎样才能做到这一点 在这里找到了解决方案 http gitster livejo
  • 使用 valueForKeyPath 获取数组元素

    有什么办法可以访问NSArray元素与valueForKeyPath 例如 谷歌的反向地理编码服务返回非常复杂的数据结构 如果我想获取城市 现在我必须将其分成两个调用 如下所示 NSDictionary address NSString s
  • 计算 PHP echo 表中的出现次数

    我是 PHP 和 MySQL 的新手 虽然 StackOverflow 上有很多这方面的示例 但它们都不太适合我的情况 所以 我有一张表 名为votes 看起来像这样 student name student id teacher Joe
  • 解释重构[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 Question 我的问题是如何教授整理和重构代码的方法和重要性 背景 我最近正在为一位同事进行代码审查 他们对早已消失的同事工作做了一些
  • 如何在 UIlabel 中显示阿拉伯语文本

    我从服务器得到这个字符串作为响应 它实际上是阿拉伯语 1606 1585 1610 1583 1571 1606 1606 1585 1609 1607 1584 1575 1601 1610 1575 1604 1604 1594 157
  • 展平 Scala Spark Dataframe 中的嵌套 json

    我有多个来自任何restapi 的json 但我不知道它的架构 我无法使用 dataframes 的爆炸功能 因为我不知道由 Spark api 创建的列名称 1 我们可以通过解码值来存储嵌套数组元素的键吗dataframe schema