Spark集群中RDD映射函数内部调用函数

2024-01-15

我正在测试我在代码中定义的一个简单的字符串解析器函数,但其​​中一个工作节点在执行时总是失败。这是我一直在测试的虚拟代码:

/* JUST A SIMPLE PARSER TO CLEAN PARENTHESIS */
def parseString(field: String): String = {
    val Pattern = "(.*.)".r
    field match{
        case "null" => "null"
        case Pattern(field) => field.replace('(',' ').replace(')',' ').replace(" ", "")
    }
}

/* CREATE TWO DISTRIBUTED RDDs TO JOIN THEM */
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)), 6)
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)), 6)
val manipulated_emp = emp.keyBy(t => t._3)
val manipulated_dept = dept.keyBy(t => t._2)
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)

/* OUTPUT */
left_outer_join_data.collect.foreach(println)
/*
(30,((3,matt,30),Some((hive,30))))
(30,((5,rhonda,30),Some((hive,30))))
(20,((2,ricky,20),Some((spark,20))))
(10,((1,jordan,10),Some((hadoop,10))))
(35,((4,mince,35),None))
*/

val res = left_outer_join_data
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
.collect

res
.map(f => ( f._1, f._2, parseString(f._3)))
.foreach(println)

/* DESIRED OUTPUT */
/*
(3,matt,hive,30)
(5,rhonda,hive,30)
(2,ricky,spark,20)
(1,jordan,hadoop,10)
(4,mince,null)
*/

如果我收集以下结果,此代码将有效res首先在驾驶员中。由于这是一个测试,因此这样做没有问题,但我的实际应用程序将处理数百万行,并且不鼓励在驱动程序中收集结果。所以如果我也这样做无需先收集, 像这样:

val res = left_outer_join_data
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))

res
.map(f => ( f._1, f._2, parseString(f._3)))
.foreach(println)

我得到以下信息:

ERROR TaskSetManager: Task 5 in stage 17.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 17.0 failed 4 times, most recent failure: Lost task 5.3 in stage 17.0 (TID 166, 192.168.28.101, executor 1): java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
        at tele.com.SimcardMsisdn$.main(SimcardMsisdn.scala:249)
        at tele.com.SimcardMsisdn.main(SimcardMsisdn.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

为什么 Spark 无法在节点上执行我的解析器?您能推荐一个解决方案或解决方法吗?

UPDATE

我找到了这个问题的解决方案(发布在下面),尽管如此,我仍然对这个问题感到困惑,也许是我做错了。


好吧,我已经通过广播自己设法解决了这个问题Pattern工人的变量:

val Pattern = sc.broadcast("(.*.)".r)

并在映射中而不是在函数中进行模式匹配,并且不收集到驱动程序:

val res = left_outer_join_data.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
res.map(f => (f._1, f._2, f._3 match {
        case "null" => "null"
        case Pattern.value(f._3) => f._3.replace('(',' ').replace(')',' ').replace(" ", "")})
    )
.foreach(println)

然后我从工作人员标准输出中得到了所需的输出:

(3,matt,hive,30)
(5,rhonda,hive,30)
(2,ricky,spark,20)
(1,jordan,hadoop,10)
(4,mince,null)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark集群中RDD映射函数内部调用函数 的相关文章

随机推荐

  • 暂停和恢复 ScheduledExecutorService

    我正在写一个俄罗斯方块克隆 我想让碎片每 60 秒落下得更快一点 为此我使用了预定执行服务 http docs oracle com javase 7 docs api java util concurrent ScheduledExecu
  • 创建运行时确定类型实例的最佳方法[重复]

    这个问题在这里已经有答案了 创建运行时确定的类型实例的最佳方法 在 NET 4 中 是什么 我有一个实例方法 虽然作用于 BaseClass 对象 但可以由其派生类的实例调用 我需要创建另一个相同类型的实例this方法内 为每个派生类重载方
  • 使用 ToArray() 将列表转换为数组

    我创建了一个名为 listItem 的类和以下列表 List
  • 重新申报错误

    我已经理解声明和定义之间的区别 当我遇到疑问时 我正在练习一些问题 下面的代码要求我列出代码片段中的错误 f int a int b int a a 20 return a 为什么这会给出重新声明错误a 它不应该给出多重定义吗a因为在 f
  • 使用 APIController 补充 [FromUri] 序列化

    我们有多个 API 控制器接受 GET 请求 如下所示 FooController public IHttpActionResult Get FromUri Foo f BarController public IHttpActionRes
  • “未使用的导入警告”和 pylint

    因此 我正在使用 Python 开发一个项目 并尝试使其符合 pylint 以及一般情况下的标准 所以 我有一个源文件 我们将其称为 a py a py import loggingsetup def foo log info This i
  • C 堆栈中的递归[关闭]

    Closed 这个问题需要细节或清晰度 help closed questions 目前不接受答案 这是合并排序中分区的代码 我实际上无法理解 reusrion 在其中是如何工作的 合并排序分区 void partition int arr
  • 7z 命令行压缩文件夹

    我正在尝试使用 7zG exe 的命令行来压缩 7z 文件夹 我的代码适用于文件 但不适用于文件夹 有人可以告诉我使用 7z 命令行压缩文件夹的正确方法吗 以下是仅适用于文件的示例代码 每当我尝试运行此代码时 7zip 都会显示一个消息框
  • 如何在matlab中编写指标函数

    我是 matlab 的新用户 我想解决以下问题 我想构造一个分段常数函数f f应该是一个匿名函数 例如f t 1 0 0 25 t 然而 分段常数函数的区间数一般不固定 相反 分段间隔取决于用户输入 例如 如果输入4 则分段间隔变为 0 0
  • WooCommerce 在订单状态完成时触发功能

    我试图在订单完成时触发一个功能 我正在使用这段代码 add action woocommerce order status completed array this payment complete 1 然后这个函数 public func
  • 异步等待进度报告不起作用

    我有一个 C WPF 程序 它打开一个文件 逐行读取它 操作每一行 然后将该行写入另一个文件 那部分工作得很好 我想添加一些进度报告 因此我将方法设为异步 并将 wait 与进度报告结合使用 进度报告非常简单 只需更新屏幕上的标签即可 这是
  • 从 RSA .pem 文件获取私钥 [重复]

    这个问题在这里已经有答案了 鉴于这种 pem文件 使用 openssl 生成并使用密码加密 BEGIN RSA PRIVATE KEY Proc Type 4 ENCRYPTED DEK Info DES EDE3 CBC AC009672
  • 在注释视图上使用 canShowCallout 时出现 MKMapKit 异常

    我正在尝试使用非常简单的自定义地图注释视图和标注 创建注释视图时 只需将 UIImageView 作为子视图添加到其自身 效果很好 但是 当我在注释视图上调用 canShowCallout 时 返回视图后立即在 MapKit 中引发异常 堆
  • 在数据库中记录 ActionMailer 发送的电子邮件的最佳方式?

    我需要记录通过 ActionMailer 发送的电子邮件 而简单的文本文件是不够的 我需要将日志存储在 ActiveRecord 模型中 电子邮件发送性能可能会受到影响 但在这种情况下 这是最好的选择 因为应用程序随后会定期访问日志 我还需
  • 合并android清单文件,过滤器冲突

    我正在尝试合并 Unity 中 2 个插件的 Android 清单文件 但是有两个活动具有相同的意图过滤器 我只能让其中一个同时工作 在这 2 个冲突的活动中 清单文件中位于顶部的活动有效 因此 如果清单 1 中的活动位于顶部 则插件 1
  • 为什么 bash 没有捕获来自 Java 的 process.destroy() 的信号[重复]

    这个问题在这里已经有答案了 用于测试的脚本 The script below just hangs until you press Ctrl C then it takes two seconds to shut down I wrote
  • std::function 的类型推导

    以下代码不会编译 因为在编译时未调用匹配的 std function 构造函数 template
  • Windows - 直接运行 .py 与运行 python blah.py 的行为不同

    我有一个使用子进程的 python 脚本 import subprocess print Running stuff subprocess check call do stuff bat print Stuff run 如果它被命名为 bl
  • 如何使用 FB.ui Javascript sdk 在 Facebook 上上传视频

    我正在使用 FB ui 方法将图像发布到 facebook 如下所述 FB ui display popup method feed name image link link picture image path caption capti
  • Spark集群中RDD映射函数内部调用函数

    我正在测试我在代码中定义的一个简单的字符串解析器函数 但其 中一个工作节点在执行时总是失败 这是我一直在测试的虚拟代码 JUST A SIMPLE PARSER TO CLEAN PARENTHESIS def parseString fi