关于如何以编程方式从 json 文件开始创建自定义 org.apache.spark.sql.types.StructType 架构对象

2024-03-21

我必须使用 json 文件中的信息创建一个自定义 org.apache.spark.sql.types.StructType 架构对象,json 文件可以是任何内容,所以我在属性文件中对其进行了参数化。

属性文件如下所示:

//ruta al esquema del fichero output (por defecto se infiere el esquema del Parquet destino). Si existe, el esquema será en formato JSON, aplicable a DataFrame (ver StructType.fromJson)
schema.parquet=/Users/XXXX/Desktop/generated_schema.json
writing.mode=overwrite
separator=;
header=false

文件 generated_schema.json 如下所示:

{"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}

所以,这就是我认为我可以解决它的方式:

val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))

System.out.println("schema_json looks like "  + schema_json.head)

val mySchemaStructType :DataType = DataType.fromJson(schema_json.head)

/*
After this line, mySchemaStructType have four StructFields objects inside it, the same than appears at schema_json
*/
logger.info(mySchemaStructType)

val myStructType = new StructType()
myStructType.add("mySchemaStructType",mySchemaStructType)

/*

After this line, myStructType have zero StructFields! here must be the bug, myStructType should have the four StructFields that represents the loaded schema json! this must be the error! but how can i construct the necessary StructType object?

*/

myDF = loadCSV(sqlContext, path_input_csv,separator,myStructType,header)
System.out.println("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()

df.write
  .format("com.databricks.spark.csv")
  .option("header", header)
  .option("delimiter",delimiter)
  .option("nullValue","")
  .option("treatEmptyValuesAsNulls","true")
  .mode(saveMode)
  .parquet(pathParquet)

当代码运行最后一行 .parquet(pathParquet) 时,会发生异常:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

这段代码的输出是这样的:

16/11/11 13:57:04 INFO AnotherCSVtoParquet$: The job started using this propertie file: /Users/aisidoro/Desktop/mra-csv-converter/parametrizacion.properties
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_input_csv is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_output_parquet  is /Users/aisidoro/Desktop/output900000
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: mra_schema_parquet is /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: writting_mode is overwrite
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: separator is ;
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: header is false
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: ATTENTION! aplying mra_schema_parquet  /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
schema_json looks like {"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}
16/11/11 13:57:12 INFO AnotherCSVtoParquet$: StructType(StructField(codigo,StringType,true), StructField(otro,StringType,true), StructField(vacio,StringType,true), StructField(final,StringType,true))
 16/11/11 13:57:13 INFO AnotherCSVtoParquet$: loadCSV. header is false, inferSchema is false pathCSV is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv separator is ;
 myDF.schema.json looks like {"type":"struct","fields":[]}

schema_json 对象和 myDF.schema.json 对象应该具有相同的内容,不是吗?但它并没有发生。我认为这一定会引发错误。

最后,工作因以下例外而崩溃:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

事实是,如果我不提供任何 json 模式文件,作业执行得很好,但是使用这个模式......

有谁能够帮助我?我只想从 csv 文件和 json 模式文件开始创建一些镶木地板文件。

谢谢。

依赖项是:

    <spark.version>1.5.0-cdh5.5.2</spark.version>
    <databricks.version>1.5.0</databricks.version>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>${databricks.version}</version>
    </dependency>

UPDATE

我可以看到有一个悬而未决的问题,

https://github.com/databricks/spark-csv/issues/61 https://github.com/databricks/spark-csv/issues/61


既然你说Custom Schema,你可以做这样的事情。

val schema = (new StructType).add("field1", StringType).add("field2", StringType)
sqlContext.read.schema(schema).json("/json/file/path").show

另外,看看this https://stackoverflow.com/questions/33807145/evolving-a-schema-with-spark-dataframe and this http://blog.antlypls.com/blog/2016/01/30/processing-json-data-with-sparksql/

您可以创建嵌套的 JSON 架构,如下所示。

例如:

{
  "field1": {
    "field2": {
      "field3": "create",
      "field4": 1452121277
    }
  }
}

val schema = (new StructType)
  .add("field1", (new StructType)
    .add("field2", (new StructType)
      .add("field3", StringType)
      .add("field4", LongType)
    )
  )
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

关于如何以编程方式从 json 文件开始创建自定义 org.apache.spark.sql.types.StructType 架构对象 的相关文章

  • for-yield-getOrElse 是 Scala 的范例还是有更好的方法?

    基本上我想提取一堆选项 a b 等 这是在 Scala 中执行此操作的最佳方法吗 对于我来说 括号中的 for yield 看起来有点令人困惑 for a lt a b lt b c lt c yield getOrElse 尝试使用map
  • Scala 中的多个类型下限

    我注意到tuple productIterator总是返回一个Iterator Any 想知道是否无法设置多个下限 因此它可能是最低公共超类型的迭代器 我尝试并搜索了一下 但只发现this https stackoverflow com q
  • 如何使用 Apache Livy 设置 Spark 配置属性?

    我不知道在向 Apache Livy 提交 Spark 作业时如何以编程方式传递 SparkSession 参数 这是测试 Spark 作业 class Test extends Job Int override def call jc J
  • SBT - 运行任务来设置SettingKey

    所以我的一般问题是我想根据任务的结果设置版本密钥 但是版本密钥是在任务运行之前设置的 据我了解 一旦设置了键的值 我就无法更改它 因此我无法在我的任务中更改它 我想要做的是将任务作为发布任务的依赖项运行并更改版本的值 我觉得一定有办法做到这
  • 对于值类型,asInstanceOf[X] 和 toX 之间有什么区别吗?

    我使用 IntelliJ 将 Java 代码转换为 Scala 代码的功能 通常效果很好 看来 IntelliJ 用调用替换了所有强制转换asInstanceOf 是否有任何有效的用法asInstanceOf Int asInstanceO
  • 如何从命令行运行scala文件?

    scala是否支持scala run xxx scala go语言支持这样运行 go my go 并且Python支持 python my py 但看来 scala xxx scala 仅进行语法检查 未观察到任何输出或运行行为 那么有没有
  • 案例类和案例对象之间的区别?

    我正在学习 Scala 和 Akka 并且在最近的查找中solution https stackoverflow com questions 22770927 waiting for multiple results in akka 我发现
  • Scala SBT 和 JNI 库

    我正在编写一个简单的应用程序Scala通过以下方式使用 leveldb 数据库leveldbjni图书馆 我的build sbt文件看起来像这样 name Whatever version 1 0 scalaVersion 2 10 2 l
  • 使用 slick 3.0.0-RC1 无法在 TableQuery 上找到方法结果

    我正在尝试 Slick3 0 0 RC1我遇到了一个奇怪的问题 这是我的代码 import slick driver SQLiteDriver api import scala concurrent ExecutionContext Imp
  • 如何询问 Scala 类型参数的所有实例化是否存在证据?

    给定皮亚诺数的以下类型级加法函数 sealed trait Nat class O extends Nat class S N lt Nat extends Nat type plus a lt Nat b lt Nat a match c
  • 无法在 SBT 中运行 Apache Spark 相关单元测试 - NoClassDefFoundError

    我有一个简单的单元测试 使用SparkContext 我可以在 IntelliJ Idea 中运行单元测试 没有任何问题 但是 当尝试从 SBT shell 运行相同的测试时 我收到以下错误 java lang NoClassDefFoun
  • 用惯用的 Scala 更新大型数据结构

    我已经尝试 Scala 一段时间了 并且经常遇到支持不可变数据结构的建议 但是当你有一个像这样的数据结构时3D 场景图 大型神经网络或任何具有大量需要频繁更新的对象的东西 对场景中的对象进行动画处理 训练神经网络 这似乎是 运行时效率极低
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • Scala 中抛出异常,什么是“官方规则”

    我正在 Coursera 上学习 Scala 课程 我也开始阅读 Odersky 的 Scala 书 我经常听到的是 在函数式语言中抛出异常不是一个好主意 因为它破坏了控制流 并且我们通常返回一个失败或成功的 Either Scala 2
  • Haskell scala 互操作性

    我是 Scala 初学者 来自面向对象范式 在了解 Scala 的函数式编程部分时 我被引导到 Haskell 纯函数式编程语言 探索 SO 问题答案 我发现 Java Haskell 具有互操作性 我很想知道 Scala Haskell
  • Spark日期格式问题

    我在火花日期格式中观察到奇怪的行为 实际上我需要转换日期yy to yyyy 日期转换后 日期应为 20yy 我尝试过如下 2040年后失败 import org apache spark sql functions val df Seq
  • 通用 scala 函数,其输入是变量数量的函数

    我想定义一个函数f需要另一个函数g 我们需要g采取采取n双打 对于某些固定n 并返回一个 Double 函数调用f g 应该返回具体值n 例如 f Math max 2因为 Math sin 具有类型 Double Double gt Do
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 多个 scala 库导致 intellij 出错?

    我正在使用 intellij 14 和 scala 2 11 6 使用 homebrew 安装并使用符号链接 ln s usr local Cellar scala 2 11 6 libexec src usr local Cellar s

随机推荐

  • 记录 ServiceStack Web 服务 [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 用于记录基于 ServiceStack 的 Web 服务的选项有哪些 我不是在谈论单行字符串 我希望能够详细记录 可能很长 返回类型
  • 定义 CreateProjection 和 CreateMap

    在我的项目中 我使用实体框架进行 ORM 使用 Dto 类进行 api 响应 我使用 Automapper 在两者之间进行映射 当我需要直接从 EF 可查询中获取 Dtos 时 我会执行最后的操作ProjectTo lt gt 根据我的查询
  • 将 ggplot 标题放置在绘图的右上角

    我正在使用优秀的theme minimal 在 ggplot0 9 3 中找到 它具有白色背景 我想将绘图的标题放置在绘图右上角的自定义位置 在下面的例子中我知道x and y值 但我想知道是否有办法通过xmax and ymax值以确保文
  • 迭代Go地图获取索引

    为了使用revel s even https revel github io manual templates html even模板中的关键字我想在迭代时获取地图条目的索引range 有什么办法可以做到吗 我的地图具有以下结构 map s
  • dequeueReusableCellWithReuseIdentifier: 和 cellForItemAtIndexPath: 之间的区别

    我一直想知道为什么我的代码可以很好地工作cellForItemAtIndexPath 不与dequeueReusableCellWithReuseIdentifier 在获取集合视图单元格时 这是我的代码 这个效果很好 NSInteger
  • 为连续序列和分割向量创建分组变量

    我有一个向量 例如c 1 3 4 5 9 10 17 29 30 我想将形成规则 连续序列的 相邻 元素分组在一起 即在参差不齐的向量中增加 1 结果是 L1 1L2 3 4 5L3 9 10 L4 17L5 29 30 天真的代码 前 C
  • 如何确定 Qt 5 中的 QtWebEngine 在运行时使用的是哪个 chromium 版本?

    我在 Qt 5 中找不到任何函数来确定使用哪个 chromium 版本QtWebEngine 我不想在代码中硬编码 chromium 版本 因为我经常更新我的应用程序 并且每个版本中的 chromium 版本通常都会更改 而且 Qt 是向后
  • 目录相对 ZwCreateFile

    我必须为我的大学项目实施交叉视图文件完整性检查器 为此 我如何在内核模式下列出目录的文件 你的起点是ZwCreateFile http msdn microsoft com en us library windows hardware ff
  • 如何使用 ComPtr 中包装的 Direct3D 11 指针来获取 11.1 接口?

    我正在遵循教程 并将通常的初始化转换为使用 ComPtrs 直到这一行 ID3D11Device g pd3dDevice nullptr ID3D11Device1 g pd3dDevice1 nullptr Obtain the Dir
  • TriangleMesh JavaFX 中 getNormals() 方法的用途是什么

    我目前正在开发 JavaFX 3D 应用程序 并在 TriangleMesh 类中遇到 getNormals 方法 正如 Triangle Mesh 类用于创建用户定义的 Java FX 3D 对象一样 其中getPoints 用于添加Po
  • Python:subprocess.Popen() 的第一个实例非常慢

    我确信我错过了一些简单的东西 但是当使用子进程模块时 启动第一个子进程需要等待一段非常长的时间 gt 10 秒 第二个在第一个之后不久开始 有没有什么办法解决这一问题 代码如下 编辑 要添加 HWAccess 在 proc py 中 链接一
  • 如何将音频文件录制为 .m4a 格式?

    如何将音频文件录制为 m4a 格式 我正在使用下面的代码 public void startRecording throws IOException recorder new MediaRecorder path sdcard pithys
  • PyCharm:Scapy 未解决的参考

    我正在开发一个使用 scapy 用 python 编写的网络工具 我使用 Pycharm 作为 IDE 我的代码有效 因此 如果我运行它 一切都会按预期进行 我的问题是 PyCharm 给了我一些错误 它标志着每次使用IP TCP Ethe
  • 将 scanf 与 x86-64 GAS 组件结合使用

    我在尝试调用系统函数 scanf 以在我的 x86 汇编程序中工作时遇到了很多问题 目前我已经让它从标准中读取 但是它只会读取没有段错误的字符 我不知道为什么 指定字符串是 d 我在网上看到的 x86 中的 scanf 示例使用 quark
  • Git 忽略文件,而不删除它

    我有一个使用 GIT 进行版本控制的网站 我设置了一个系统 基本上可以自动部署我的更改master分支到我的生产服务器 也就是说 我的存储库中有一个 Web 挂钩 它会触发一个 PHP 脚本 该脚本本质上会启动一个git pull在服务器上
  • WatiN 搜索 google 后找不到文字

    我正在尝试运行一个简单的等待示例 搜索谷歌然后验证搜索结果 在 IE9 上 var browser new IE http www google com ncr browser TextField Find ByName q TypeTex
  • 从用于计算三角形和外接圆的 Swingworker 中重新绘制小程序

    我正在尝试复制找到的小程序here http www diku dk hjemmesider studerende duff Fortune 作为练习的一部分 该小程序使用 财富 的算法来生成两者 Voronoi 图和 Delaunay 三
  • 大背景图像和屏幕尺寸

    我正在创建一个网站 该网站将使用无法平铺的图像 我需要这张图像覆盖整个背景屏幕 但是 我希望它能够在大型显示器和小型显示器上运行 我应该制作一张大背景图像并使用它缩小它吗background size或者我应该创建不同尺寸的同一图像的多个版
  • 当定义需要 import 语句时,如何扩展现有接口?

    我创建了一个自定义 Knockout 扩展器 但在扩展 Knockout 定义文件提供的现有接口时遇到了问题 扩展器 Numeric ts import as ko from knockout function Extender targe
  • 关于如何以编程方式从 json 文件开始创建自定义 org.apache.spark.sql.types.StructType 架构对象

    我必须使用 json 文件中的信息创建一个自定义 org apache spark sql types StructType 架构对象 json 文件可以是任何内容 所以我在属性文件中对其进行了参数化 属性文件如下所示 ruta al es