Spark中json字符串和DataFrame相互转换

2023-11-10

本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。

json字符串转DataFrame

spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:

  • 若列数据全为null会用String类型
  • 整数默认会用Long类型
  • 浮点数默认会用Double类型
val json1 = """{"a":null, "b": 23.1, "c": 1}"""
val json2 = """{"a":null, "b": "hello", "d": 1.2}"""

val ds = spark.createDataset(Seq(json1, json2))
val df = spark.read.json(ds)
df.show
df.printSchema

+----+-----+----+----+
|   a|    b|   c|   d|
+----+-----+----+----+
|null| 23.1|   1|null|
|null|hello|null| 1.2|
+----+-----+----+----+

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: long (nullable = true)
 |-- d: double (nullable = true)

若指定schema会按照schema生成DF:

  • schema中不存在的列会被忽略
  • 可以用两种方法指定schema,StructType和String,具体对应关系看后面
  • 若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常
val schema = StructType(List(
        StructField("a", ByteType, true),
        StructField("b", FloatType, false),
        StructField("c", ShortType, true)
    ))
//或 val schema = "b float, c short"  
val df = spark.read.schema(schema).json(ds)
df.show
df.printSchema

+----+----+----+
|   a|   b|   c|
+----+----+----+
|null|23.1|   1|
|null|   0|null|
+----+----+----+

root
 |-- a: byte (nullable = true)
 |-- b: float (nullable = true)
 |-- c: short (nullable = true)

json解析相关配置参数

primitivesAsString (default false): 把所有列看作string类型
prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.
allowComments (default false): 忽略json字符串中Java/C++风格的注释
allowUnquotedFieldNames (default false): 允许不加引号的列名
allowSingleQuotes (default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars (default false): 允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。

mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。

PERMISSIVE :当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。 如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED : 忽略整条损害记录
FAILFAST : 遇到损坏记录throws an exception
columnNameOfCorruptRecord (默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord

dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式. 只有日期部分(无详细时间)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX): 自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)
multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件

以上参数可用option方法配置:

val stringDF = spark.read.option("primitivesAsString", "true").json(ds)
stringDF.show
stringDF.printSchema

+----+-----+----+----+
|   a|    b|   c|   d|
+----+-----+----+----+
|null| 23.1|   1|null|
|null|hello|null| 1.2|
+----+-----+----+----+

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)

二进制类型会自动用base64编码方式表示

‘Man’(ascci) base64编码后为:”TWFu”


val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte)
val binaryDs = spark.createDataset(Seq(byteArr))
val dsWithB64 = binaryDs.withColumn("b64", base64(col("value")))

dsWithB64.show(false)
dsWithB64.printSchema

+----------+----+
|value     |b64 |
+----------+----+
|[4D 61 6E]|TWFu|
+----------+----+

root
 |-- value: binary (nullable = true)
 |-- b64: string (nullable = true)

//=================================================

dsWithB64.toJSON.show(false)
+-----------------------------+
|value                        |
+-----------------------------+
|{"value":"TWFu","b64":"TWFu"}|
+-----------------------------+

//=================================================

val json = """{"value":"TWFu"}"""
val jsonDs = spark.createDataset(Seq(json))
val binaryDF = spark.read.schema("value binary").json(jsonDs )

binaryDF.show
binaryDF.printSchema

+----------+
|     value|
+----------+
|[4D 61 6E]|
+----------+

root
 |-- value: binary (nullable = true)

指定schema示例:

以下是Spark SQL支持的所有基本类型:

val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3, "floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23, "binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12 11:22:22.123123"}"""
val ds = spark.createDataset(Seq(json))
val schema = "stringc string, shortc short, integerc int, longc long, floatc float, doublec double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary, datec date, timestampc timestamp"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc   |datec     |timestampc             |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc    |1     |null    |3    |4.5   |6.7    |8.900   |true    |23   |[4D 61 6E]|2010-01-01|2012-12-12 11:22:22.123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

root
 |-- stringc: string (nullable = true)
 |-- shortc: short (nullable = true)
 |-- integerc: integer (nullable = true)
 |-- longc: long (nullable = true)
 |-- floatc: float (nullable = true)
 |-- doublec: double (nullable = true)
 |-- decimalc: decimal(10,3) (nullable = true)
 |-- booleanc: boolean (nullable = true)
 |-- bytec: byte (nullable = true)
 |-- binaryc: binary (nullable = true)
 |-- datec: date (nullable = true)
 |-- timestampc: timestamp (nullable = true)

复合类型:

val json = """
{
  "arrayc" : [ 1, 2, 3 ],
  "structc" : {
    "strc" : "efg",
    "decimalc" : 1.1
  },
  "mapc" : {
    "key1" : 1.2,
    "key2" : 1.1
  }
}
"""
val ds = spark.createDataset(Seq(json))
val schema = "arrayc array<short>, structc struct<strc:string, decimalc:decimal>, mapc map<string, float>"
val df = spark.read.schema(schema).json(ds)
df.show(false)
df.printSchema

+---------+--------+--------------------------+
|arrayc   |structc |mapc                      |
+---------+--------+--------------------------+
|[1, 2, 3]|[efg, 1]|[key1 -> 1.2, key2 -> 1.1]|
+---------+--------+--------------------------+

root
 |-- arrayc: array (nullable = true)
 |    |-- element: short (containsNull = true)
 |-- structc: struct (nullable = true)
 |    |-- strc: string (nullable = true)
 |    |-- decimalc: decimal(10,0) (nullable = true)
 |-- mapc: map (nullable = true)
 |    |-- key: string
 |    |-- value: float (valueContainsNull = true)

SparkSQL数据类型

基本类型:

DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3) “decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”

三个复合类型:

DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array<int> array ARRAY<INT> 4 array<int> {“type”:”array”,”elementType”:”integer”,”containsNull”:true}
MapType(StringType, LongType, true) map<string,bigint> map MAP<STRING, BIGINT> 28 map<string,bigint> {“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true}
StructType(StructField(“sf”, DoubleType)::Nil) struct<sf:double> struct STRUCT<`sf`: DOUBLE> 8 struct<sf:double> {“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark中json字符串和DataFrame相互转换 的相关文章

随机推荐

  • 【电气专业知识问答】问:蓄电池组、直流电源系统是由哪几部分组成?

    电气专业知识问答 问 蓄电池组 直流电源系统是由哪几部分组成 答 蓄电池组直流电源系统通常是由直流充电装置及其监控系统 蓄电池组及其检测装置 直流馈线输出屏 直流绝缘监察装置等组成的 1 充电装置 直流充电装置当今多采用高频开关电源或相控型
  • 《疯狂Java讲义》读书笔记(四):Java基础类库

    第七章 Java基础类库 使用Scanner获取键盘输入 Scanner类提供了多个构造器 不同构造器可以接收文件 输入流 字符串作为数据源 主要提供了2个方法 hasNextXXX 是否还有下一个输入项 XXX可以表示Int Long等
  • 记录SQL Server数据库中如何指定用户查看指定的视图

    exec sp addrole seeview 创建了一个数据库角色 名称为 seeview 分配视图权限 GRANT SELECT ON veiw TO 角色 指定视图列表 指定seeview这个角色可以查看的视图表名称 也就是这个角色可
  • QuickCam Gev 2.0 开发

    安装 QuickCam Gev2 0 版本比较老了 是2012 年还没被Dalsa收购时候的产品 因为项目需要 开发了其驱动 安装QuickCam Gev2 0的时候Ebus卡住安装不上 但是不能cancel了 打开QuickCamGev
  • spyder_console窗口错误_An error ocurred while starting the kernel

    按照上面的要求安装spyder kernels 但是依然出错 通过观察右下角发现spyder选用的python版本是python3 9 16 而自己通过python版本查看发现默认的3 9 13 所以安装好的是base环境3 9 13 切换
  • java中JVM的原理

    看过JVM讲解最好的 一 Java虚拟机的生命周期 Java虚拟机的生命周期 一个运行中的Java虚拟机有着一个清晰的任务 执行Java程序 程序开始执行时他才运行 程序结束时他就停止 你在同一台机器上运行三个程序 就会有三个运行中的Jav
  • erpadmin答疑为什么企业有很多“不上ERP等死,上了ERP找死”

    不上ERP等死 上了ERP找死 如何破除这个 魔咒 希望erpadmin总结的如何做好ERP系统实施工作的方法对你有所启发 ERP是建立在信息技术基础上 整合了企业管理理念 业务流程 基础数据 人力物力财力 计算机硬件和软件于一体的企业资源
  • 使用B站API:http://api.bilibili.com/x/space/upstat?mid=2026561407获取播放量、点赞量的返回报文中data数据缺失问题排查(已解决)

    背景 想要用ESP32获取一些b站上的数据粉丝量播放量等数据 获取粉丝数的API http api bilibili com x relation stat vmid 2026561407 获取播放量的API http api bilibi
  • script 标签 async 属性

    script 标签 async 属性 普通script 文档解析的过程中 如果遇到script脚本 就会停止页面的解析进行下载 但是Chrome会做一个优化 如果遇到script脚本 会快速的查看后边有没有需要下载其他资源的 如果有的话 会
  • python-django的JsonResponse返回中文数据编码问题

    JsonResponse res 方法1 直接加这一句即可 json dumps params ensure ascii False return JsonResponse user 王 password 123456 json dumps
  • 一文读懂卷积神经网络CNN(学习笔记)

    来源 机器学习算法与自然语言处理 作者 白雪峰 本文为图文结合 建议阅读10分钟 本文为大家解读如何简单明了的解释卷积 并且分享了学习中的一些方法案例 首先文章的提纲为 CNN栗子镇楼 What is CNN
  • 期货交易大神的分享,有用的策略

    1 每天只做一次 开盘后行情形成后开仓 2 在价格走势很慢的时候进入 开完仓价格朝着不利方向走 就无条件平仓 当天不再做第二次 3 开完仓价格朝着有利的一侧运行后 确认后在开仓价设好止损 通过条件单或闪电手自动止损功能 不再关注行情 收盘之
  • #pragma once用法总结,及与 #ifndef方式的区别

    1 pragmaonce这个宏有什么作用 为了避免同一个头文件被包含 include 多次 C C 中有两种宏实现方式 一种是 ifndef方式 另一种是 pragma once方式 在能够支持这两种方式的编译器上 二者并没有太大的区别 但
  • MATLAB 学习笔记(3)MATLAB 矩阵的进阶操作

    目录 MATLAB 矩阵标量操作 实际例子 MATLAB 矩阵的转置 实际例子 MATLAB 串联矩阵 实际例子 MATLAB 矩阵的行列式 MATLAB 逆矩阵 详细例子 MATLAB 矩阵标量操作 标量指的是只有大小没有方向的数 与之相
  • 大学数学竞赛常用不等式_第三届全国大学生数学竞赛初赛(专业组)

    系列传送门 陆艺 第一届全国大学生数学竞赛初赛 专业组 陆艺 第二届全国大学生数学竞赛初赛 专业组 陆艺 第三届全国大学生数学竞赛初赛 专业组 陆艺 第四届全国大学生数学竞赛初赛 专业组 陆艺 第五届全国大学生数学竞赛初赛 专业组 陆艺 第
  • SpringBoot+MyBatis:解决前端上传文件并将url保存到数据库

    前言 最近也是遇到了这个问题 最后成功解决 前期在网上搜索了很多内容 发现都很复杂而且都不尽相同 况且不同的开发软件 不同的配置都会增加我们参考时的麻烦 这里为大家放上了更加简便的方法 开发软件 SpringToolSuite4 个人认为比
  • Allegro如何取消网络高亮

    有时PCB里面不知道为什么有很多网络和焊盘高亮 看着很不协调 想要取消高亮的方法为先点击Dehilight 然后在Options的Dehighligh all里面选择all 如下图 高亮 取消高亮
  • 关于单片机头文件的使用方法

    在单片机的使用中 我们经常会在文件的开始部分进行头文件的定义 即使我们在编写十分简单的LED驱动程序时 往往也引用了头文件 include
  • AttributeError: module ‘time‘ has no attribute ‘clock‘

    报错 AttributeError module time has no attribute clock 原因是 Python3 8 不再支持time clock 但在调用时 非本工程文件CBTaggingDecoder依然包含该方法 修改
  • Spark中json字符串和DataFrame相互转换

    本文介绍基于Spark 2 0 的Json字符串和DataFrame相互转换 json字符串转DataFrame spark提供了将json字符串解析为DF的接口 如果不指定生成的DF的schema 默认spark会先扫码一遍给的json字