JSON 格式的 Spark DataFrame 列上的隐式架构发现

2023-11-24

我正在 Scala 阅读中编写 ETL Spark (2.4) 作业;-S3 上使用 glob 模式分隔的 CSV 文件。数据加载到 DataFrame 中并包含一列(假设它名为custom) 与 JSON 格式的字符串 (多层嵌套)。目标是自动从该列推断架构,以便可以为 S3 中的 Parquet 文件上的写入接收器构建它。

这个帖子 (如何使用 Spark DataFrames 查询 JSON 数据列?)建议schema_of_jsonSpark 2.4 中的可以从 JSON 格式的列或字符串推断模式。

这是我尝试过的:

val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

df.withColumn(
    "nestedCustom",
    from_json(col("custom"), jsonSchema, Map[String, String]())
)

但上面的方法不起作用并引发此异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(`custom`)' due to data type mismatch: The input json should be a string literal and not null; however, got `custom`.;;
'Project [schemaofjson(custom#7) AS schemaofjson(custom)#16]

请记住,我正在过滤掉空值custom对于这个数据框。


EDIT:下面的整个代码。

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
  * RandomName entry point.
  *
  * @author Random author
  */
object RandomName {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("RandomName")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.parquet.fs.optimized.committer.optimization-enabled", true)
      .getOrCreate

    import spark.implicits._

    val randomName: RandomName = new RandomName(spark)

    val df: sql.DataFrame  = randomName.read().filter($"custom".isNotNull)
    val jsonSchema: String = df.select(schema_of_json(col("custom"))).as[String].first

    df.withColumn(
      "nestedCustom",
      from_json(col("custom"), jsonSchema, Map[String, String]())
    )

    df.show

    spark.stop
  }
}

class RandomName(private val spark: SparkSession) {

  /**
    * Reads CSV files from S3 and creates a sql.DataFrame.
    *
    * @return a sql.DataFrame
    */
  def read(): sql.DataFrame = {
    val tableSchema = StructType(
      Array(
        StructField("a", StringType, true),
        StructField("b", StringType, true),
        StructField("c", DateType, true),
        StructField("custom", StringType, true)
      ))

    spark.read
      .format("csv")
      .option("sep", ";")
      .option("header", "true")
      .option("inferSchema", "true")
      .schema(tableSchema)
      .load("s3://random-bucket/*")
  }
}

JSON 的示例:

{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    }
  }
}

这是一个指标custom不是有效的输入schema_of_json

scala> spark.sql("SELECT schema_of_json(struct(1, 2))")
org.apache.spark.sql.AnalysisException: cannot resolve 'schemaofjson(named_struct('col1', 1, 'col2', 2))' due to data type mismatch: argument 1 requires string type, however, 'named_struct('col1', 1, 'col2', 2)' is of struct<col1:int,col2:int> type.; line 1 pos 7;
...

您应该返回到您的数据并确保custom确实是一个String.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JSON 格式的 Spark DataFrame 列上的隐式架构发现 的相关文章

随机推荐

  • Node.js HTTP 响应正文的 unicode 问题

    使用本机 http 模块的 HTTP 请求的响应正文显示 unicode 字符的问号字符 而不是其实际值 这是我正在运行的基本代码片段 var http require http var google http createClient 8
  • 未定义的行为,或者:Swift 有序列点吗?

    在 C C 中 第二条语句 int i 0 int j i i i 调用两者 未指定的行为 因为操作数的求值顺序 未指定 并且 未定义的行为 因为对同一个对象的副作用i彼此之间是无序的 参见示例 为什么这些构造 使用 是未定义的行为 未定义
  • 如何在 didReceiveRemoteNotification 中获取 userInfo JSON 值

    func application application UIApplication didReceiveRemoteNotification userInfo NSObject AnyObject PFPush handlePush us
  • Magento 产品不会显示在类别中

    我最近负责构建和部署一个大型电子商务网站 过去 我们不得不使用客户遗留的 X cart 安装进行重新开发 与他们现有的工作流程集成度太高 我们听说过 Magento 的优点 所以我设置了一个测试安装来掌握它 在几个最初的问题之后 有一个实时
  • 在 ClickOnce 应用程序中使用 EventLog

    我有一个在多个 ClickOnce 应用程序中使用的库 如果这个库出现错误 我想将错误写入窗口EventLog 我找到了一个知识库文章关于如何 但这似乎需要管理员权限才能搜索源 特别是当试图搜索时它会窒息Security事件日志 是否有办法
  • 在java中使用不同的用户调用外部进程

    我们有一个作为 Windows 服务运行的 Java 应用程序 特定功能需要执行二进制文件 但使用启动应用程序的不同用户 有什么方法可以让我们以 以不同用户身份运行 样式调用 exe 我检查了ProcessBuilder的API 但没有找到
  • Powershell 和 SQL 参数。如果为空字符串,则传递 DBNull

    我得到这个参数 objDbCmd Parameters Add telephone System Data SqlDbType VarChar 18 Out Null objDbCmd Parameters telephone Value
  • 为什么这个 Javascript RGB 到 HSL 代码不起作用?

    我发现这个 RGB 到 HSL 脚本位于http www mjijackson com 2008 02 rgb to hsl and rgb to hsv color model conversion algorithms in javas
  • 如何旋转元素并将其放置在左上角或右上角?

    我用文本旋转了一个 div 并想将其放置在左上角 我设法将其放置在顶部 但无法使其与左边缘对齐 我该怎么做呢 credit position absolute background color pink transform rotate 9
  • Chrome 和 Firefox CORS AJAX 调用在某些 Mac 计算机上中止

    我们有一个网页 www saddleback com live Chrome 和 Firefox CORS AJAX 调用在某些 Mac 计算机上会中止 在装有 OSX 10 9 最新更新 Chrome 和 Firefox 最新更新 的 M
  • iOS 聊天应用程序如何通信?

    Whatsapp 和 Skype 等应用程序中的文本聊天如何进行通信 具体来说 消息如何received即时的 提前致谢 这和iOS中所谓的Socket编程有关 您可以参考苹果开发人员文档或这个链接可以在这方面帮助您 事实上 以下链接会非常
  • 在 ubuntu 18.04 上更新后无法打开 libmpfr.so.4

    今天我将笔记本电脑更新到 Ubuntu 18 04 现在我尝试为我的学士论文运行一个程序 但它给了我以下错误消息 加载共享库时出错 libmpfr so 4 无法打开共享对象文件 没有这样的文件或目录 我做了一些研究 我认为 libmpfr
  • 如何在Java中用Swing正确实现MVC?

    如果您想了解更多详细信息 请告诉我 或参阅此问题的最后几行 我已经读了很多书 我觉得我正在把一些简单的东西变成复杂的东西 但我仍然被困在这里和那里 所以也许你可以在那些非常具体的点上帮助我 我使用的是 Netbeans IDE 7 和 JD
  • asp.net mvc 4控制器并行执行多个ajax调用

    我有一个 asp net MVC 4 控制器 其方法是通过 ajax 调用的 问题在于 ajax 请求是由控制器按顺序处理的 这会导致性能问题 因为加载页面的时间是所有 ajax 请求的总和 而不是最长的 ajax 请求 为了演示这一点 我
  • JavaScript - 获取满足条件的数组元素

    我正在使用 W3C 学习 JavaScript 但没有找到这个问题的答案 我正在尝试对满足某些条件的数组元素进行一些操作 除了在 for 循环中运行数组元素之外 还有其他方法吗 也许类似 用其他语言 foreach object t in
  • 在引导程序后向模块添加指令并应用于动态内容

    我有一个网页 其中定义了一个模块 myModule 我使用它来增强 angularjs angular bootstrap element myModule name 单击按钮后 我添加动态 html 并使用进行编译 compile
  • 本例中 C 语言逻辑表达式的短路行为

    PROGRAM include
  • .htaccess 重定向仅在浏览器警告后执行

    我有一个强制 HTTPS 和 www 的重写规则 SSL 证书适用于网站的 www 版本 整个网站需要是 HTTPS 问题是如果请求是https example com 在执行重定向之前 浏览器会显示一个警告页面 Firefox 中显示 此
  • sftp 避免在找不到文件时退出

    我有这个脚本 filePattern sor log filePattern2 sor SOR log myLocation opt tradertools omer clientLocation opt tradertools omer
  • JSON 格式的 Spark DataFrame 列上的隐式架构发现

    我正在 Scala 阅读中编写 ETL Spark 2 4 作业 S3 上使用 glob 模式分隔的 CSV 文件 数据加载到 DataFrame 中并包含一列 假设它名为custom 与 JSON 格式的字符串 多层嵌套 目标是自动从该列