从 Hive 表将 DataFrame 中的 ArrayBuffer 转换为 HashSet 到 RDD 时出现 GenericRowWithSchema 异常

2023-12-10

我有一个镶木地板格式的 Hive 表,是使用生成的

create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;

我能够验证它是否已填充 - 这是一个示例值

[1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]

我希望将其放入以下形式的 Spark RDD 中

((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))

现在,使用spark-shell(我在spark-submit中遇到了同样的问题),我用这些值做了一个测试RDD

scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85

使用迭代器,我可以将 ArrayBuffer 转换为以下新 RDD 中的 HashSet:

scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87

scala> tempRDD2.collect.foreach(println)
((1,abcdef),((2,ghijkl),Set((1,hello))))

但是,当我尝试使用带有 HiveContext / SQLContext 的 DataFrame 执行完全相同的操作时,出现以下错误:

scala> val hc = new HiveContext(sc)
scala> import hc._
scala> import hc.implicits._

scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")

scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))

scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91

scala> tempRDD3.collect.foreach(println)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
       at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
       at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
       at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
       at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
       at org.apache.spark.scheduler.Task.run(Task.scala:64)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:724)

Driver stacktrace:
       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
       at scala.Option.foreach(Option.scala:236)
       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

请注意,当我使用 Spark-submit 在已编译的程序中运行此错误时,我收到相同的错误“GenericRowWithSchema 无法转换为 scala.Tuple2”。当程序遇到转换步骤时,程序在运行时崩溃,并且我没有编译器错误。

我觉得很奇怪的是,我的人工生成的 RDD“tempRDD”可以进行转换,而 Hive 查询 DataFrame->RDD 却不能。我查了一下,两个 RDD 都有相同的形式:

scala> tempRDD
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776

scala> tempRDDfromHive
org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70

唯一的区别是他们最后一步的起源。在运行 tempRDD2 和 tempRDD3 的步骤之前,我什至尝试过持久化、检查点和具体化这些 RDD。所有人都收到相同的错误消息。

我还阅读了相关的 stackoverflow 问题和 Apache Spark Jira 问题,并从中尝试将 ArrayBuffer 转换为迭代器,但在第二步中也失败了,并出现相同的错误。

有谁知道如何将源自 Hive 表的 DataFrame 的 ArrayBuffer 正确转换为 HashSet ?由于该错误似乎仅针对 Hive 表版本,因此我很想认为这是 SparkSQL 中 Spark/Hive 集成的问题。

有任何想法吗?

我的Spark版本是1.3.0 CDH。

以下是 printSchema 结果:

scala> tempRDDfromHive.printSchema()
root
 |-- var1: integer (nullable = true)
 |-- var2: string (nullable = true)
 |-- var3: integer (nullable = true)
 |-- var4: string (nullable = true)
 |-- var5: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = true)
 |    |    |-- b: string (nullable = true)

期间你实际得到了什么map相不是一个ArrayBuffer[(Int, String)] but an ArrayBuffer[Row]因此出现错误。忽略其他列,您需要的是这样的:

import org.apache.spark.sql.Row

tempHiveQL.map((a: Row) =>
    a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)

看起来这个问题已经在 Spark 1.5.0 中修复了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Hive 表将 DataFrame 中的 ArrayBuffer 转换为 HashSet 到 RDD 时出现 GenericRowWithSchema 异常 的相关文章

随机推荐

  • 使用新的架构更改更新 LinqtoSql 数据库?

    我有一个已发布到市场的 Windows Phone 7 应用程序 我将 Sql CE 与 LinqToSql 一起使用 当应用程序运行时 它会通过连接字符串检查数据库是否存在 如果不存在则创建数据库 using CheckbookDataC
  • 在 Google Apps 脚本中转义正则表达式文字

    我不知道为什么这不起作用 我已经通过我在网上找到的更好的正则表达式工具之一来验证它 并且我之前使用的似乎是转义字符 号使其成为字面 但 Google 脚本一直抱怨 无效量词 第 2 行 这是我的脚本 省略了某些个人详细信息 这是为了清理电子
  • Django - 将额外参数传递给 upload_to 可调用函数

    我知道您可以使用 upload to 参数传递可调用函数来动态更改 Django 模型中的 FileFied ImageField 等 upload to 调用的函数传递了 2 个变量 即未保存在数据库中的文件的实例 instance 和所
  • 将分组平均值添加到数据框中的列[重复]

    这个问题在这里已经有答案了 我想计算数据框中的组平均值 并在包含这些组平均值的原始数据框中创建一个新列 我正在进行重复性研究 我想要新列中插入 单元和通道内测量的平均值 以便我可以将其减去并计算残差 My data gt head myte
  • 如何在 HttpPost 中使用参数

    我正在通过以下方法使用 RESTful Web 服务 POST Consumes application json Path create public void create String str1 String str2 System
  • 定期运行 JavaScript 函数

    我目前正在建立一个网站来托管软件 我想要的是在项目页面中添加循环截图的幻灯片 大约每 5 秒更改一次图像 有没有办法仅使用 JavaScript 在一定时间间隔触发脚本 或者我是否必须采用替代方法来实现我想要的功能 预先感谢您的任何帮助 s
  • Typescript 类型、泛型和抽象类

    我尝试了一种对我来说似乎很奇怪的行为 让我们考虑以下示例 在 Typescript Playground 中测试它 abstract class FooAbstract abstract bar class Foo extends FooA
  • MonoTouch“无法 AOT 程序集”

    我正在使用 MonoTouch 6 2 并且我有一个应用程序可以在模拟器上构建并运行良好 但当我为实际设备构建时 会出现 无法 AOT 程序集 错误 有没有人见过这个 这是编译器的输出 Applications Xcode app Cont
  • 在 Tomcat 上运行 JasperViewer 作为 Web 应用程序的一部分

    我了解到贾斯珀浏览器 默认预览组件贾斯珀报告 is a Swing组件 那么有什么方法可以将其转换或嵌入到Web应用程序中吗 有人说我应该使用Java网络启动 但据我所知这个链接 JWS在客户端计算机上下载并安装应用程序非常有用 但这不是我
  • 如何禁用特定控件的视图状态?

  • jquery 显示 [object object] 而不是数组 [重复]

    这个问题在这里已经有答案了 只是试图在视图中显示我通过 ajax 从控制器获取的数组 但它显示 object Object object Object 而不是数组 请检查我的js文件如下 faq title click function v
  • 如何在 PHP 中循环使用十六进制颜色代码?

    我想要一个数组 其中数组中的每个字段都包含一个颜色代码 array 0 gt 4CFF00 1 gt FFE97F 我希望它能够经历从绿色到黑色的整个颜色范围 绿色 gt 蓝色 gt 深蓝色 gt 紫色 gt 黄色 gt 橙色 gt 红色
  • 前面带有“0”的数字文字[重复]

    这个问题在这里已经有答案了 Using insert 我将值推入Array as myarray 22 33 44 myarray insert 0 02 gt 2 22 33 44 如果执行以下操作 我得到 myarray insert
  • 获取数组中特定项目的索引

    我想检索数组的索引 但我只知道数组中实际值的一部分 例如 我在数组中动态存储作者姓名 author xyz 现在我想找到包含它的数组项的索引 因为我不知道值部分 这个怎么做 您可以使用查找索引 var index Array FindInd
  • javascript 创建日期错误的月份

    使用 Mozilla Firefox Firebug var myDate new Date 2012 9 23 0 0 0 0 myDate 日期 2012 年 10 月 23 日星期二 00 00 00 GMT 0400 东部夏令时间
  • 如何验证机器人是否正在输入信息

    我有一个网络表单 用户填写该表单并将信息发送到服务器并存储在数据库中 我担心机器人可能只是填写表格 而我最终会得到一个充满无用记录的数据库 如何防止机器人填写我的表格 我在想也许类似于 Stackoverflow 的机器人检测 如果它认为你
  • 如何在 Ubuntu 16.04 上使用带有 Python 3.7 的 sqlite3 python 模块的 FTS5 扩展?

    为了测试带有 sqlite3 Python 模块的 FTS5 扩展是否有效 我使用了这个code from 技术进步 import sqlite3 conn sqlite3 connect memory conn execute creat
  • 在javascript中将输入框滚动到光标位置

    我编写了一个简单的 JS 函数 当输入框接收焦点时 它将光标置于输入框内容的末尾 框中最常见的操作是追加 我没有在 IE 中检查过 但是当文本多于可见文本时 即使将光标移动到输入末尾也不会在 Firefox 3 6 中将视图滚动到输入末尾
  • MacOS 上列表中的 SwiftUI 键盘导航

    我正在尝试实现一个可以使用箭头键 向上 向下 导航的列表 我已经创建了布局 但现在我不完全理解如何 以及在 哪里 拦截向上 向下键 以便我可以添加自定义逻辑 我已经尝试过了onMoveCommand with focusable但这不起作用
  • 从 Hive 表将 DataFrame 中的 ArrayBuffer 转换为 HashSet 到 RDD 时出现 GenericRowWithSchema 异常

    我有一个镶木地板格式的 Hive 表 是使用生成的 create table myTable var1 int var2 string var3 int var4 string var5 array