以编程方式提交作业时 Spark EC2 集群上出现 java.io.EOFException

2023-12-02

真的需要你的帮助来理解我做错了什么。

我实验的目的是以编程方式运行 Spark 作业,而不是使用 ./spark-shell 或 ./spark-submit (这些都对我有用)

环境: 我使用 ./spark-ec2 脚本创建了一个包含 1 个主节点和 1 个工作节点的 Spark 集群

然而,当我尝试运行打包在 jar 中的代码时,集群看起来不错:

val logFile = "file:///root/spark/bin/README.md"

val conf = new SparkConf()
conf.setAppName("Simple App")
conf.setJars(List("file:///root/spark/bin/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar"))
conf.setMaster("spark://ec2-54-89-51-36.compute-1.amazonaws.com:7077")

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(_.contains("a")).count()
val numBs = logData.filter(_.contains("b")).count()
println(s"1. Lines with a: $numAs, Lines with b: $numBs")

我得到一个例外:

*[info] Running com.paycasso.SimpleApp 
14/09/05 14:50:29 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/09/05 14:50:29 INFO SecurityManager: Changing view acls to: root
14/09/05 14:50:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root)
14/09/05 14:50:30 INFO Slf4jLogger: Slf4jLogger started
14/09/05 14:50:30 INFO Remoting: Starting remoting
14/09/05 14:50:30 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:54683]
14/09/05 14:50:30 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:54683]
14/09/05 14:50:30 INFO SparkEnv: Registering MapOutputTracker
14/09/05 14:50:30 INFO SparkEnv: Registering BlockManagerMaster
14/09/05 14:50:30 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140905145030-85cb
14/09/05 14:50:30 INFO MemoryStore: MemoryStore started with capacity 589.2 MB.
14/09/05 14:50:30 INFO ConnectionManager: Bound socket to port 47852 with id = ConnectionManagerId(ip-10-224-14-90.ec2.internal,47852)
14/09/05 14:50:30 INFO BlockManagerMaster: Trying to register BlockManager
14/09/05 14:50:30 INFO BlockManagerInfo: Registering block manager ip-10-224-14-90.ec2.internal:47852 with 589.2 MB RAM
14/09/05 14:50:30 INFO BlockManagerMaster: Registered BlockManager
14/09/05 14:50:30 INFO HttpServer: Starting HTTP Server
14/09/05 14:50:30 INFO HttpBroadcast: Broadcast server started at http://**.***.**.**:49211
14/09/05 14:50:30 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e2748605-17ec-4524-983b-97aaf2f94b30
14/09/05 14:50:30 INFO HttpServer: Starting HTTP Server
14/09/05 14:50:31 INFO SparkUI: Started SparkUI at http://ip-10-224-14-90.ec2.internal:4040
14/09/05 14:50:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/05 14:50:32 INFO SparkContext: Added JAR file:///root/spark/bin/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar at http://**.***.**.**:46491/jars/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar with timestamp 1409928632274
14/09/05 14:50:32 INFO AppClient$ClientActor: Connecting to master spark://ec2-54-89-51-36.compute-1.amazonaws.com:7077...
14/09/05 14:50:32 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=617820979
14/09/05 14:50:32 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 589.0 MB)
14/09/05 14:50:32 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140905145032-0005
14/09/05 14:50:32 INFO AppClient$ClientActor: Executor added: app-20140905145032-0005/0 on worker-20140905141732-ip-10-80-90-29.ec2.internal-57457 (ip-10-80-90-29.ec2.internal:57457) with 2 cores
14/09/05 14:50:32 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140905145032-0005/0 on hostPort ip-10-80-90-29.ec2.internal:57457 with 2 cores, 512.0 MB RAM
14/09/05 14:50:32 INFO AppClient$ClientActor: Executor updated: app-20140905145032-0005/0 is now RUNNING
14/09/05 14:50:33 INFO FileInputFormat: Total input paths to process : 1
14/09/05 14:50:33 INFO SparkContext: Starting job: count at SimpleApp.scala:26
14/09/05 14:50:33 INFO DAGScheduler: Got job 0 (count at SimpleApp.scala:26) with 1 output partitions (allowLocal=false)
14/09/05 14:50:33 INFO DAGScheduler: Final stage: Stage 0(count at SimpleApp.scala:26)
14/09/05 14:50:33 INFO DAGScheduler: Parents of final stage: List()
14/09/05 14:50:33 INFO DAGScheduler: Missing parents: List()
14/09/05 14:50:33 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:26), which has no missing parents
14/09/05 14:50:33 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:26)
14/09/05 14:50:33 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/09/05 14:50:36 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:36966/user/Executor#2034537974] with ID 0
14/09/05 14:50:36 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: ip-10-80-90-29.ec2.internal (PROCESS_LOCAL)
14/09/05 14:50:36 INFO TaskSetManager: Serialized task 0.0:0 as 1880 bytes in 8 ms
14/09/05 14:50:37 INFO BlockManagerInfo: Registering block manager ip-10-80-90-29.ec2.internal:59950 with 294.9 MB RAM
14/09/05 14:50:38 WARN TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/05 14:50:38 WARN TaskSetManager: Loss was due to java.io.EOFException
java.io.EOFException
    at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
    at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
    at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
    at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
    at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
    at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
    at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)*

我实际上正在做的是调用“sbt run”。所以我组装了 scala 项目并运行它。 顺便说一句,我在主主机上运行该项目,因此驱动程序对于工作主机来说肯定是可见的。 任何帮助表示赞赏。这很奇怪,这样一个简单的例子在集群中不起作用。我相信使用 ./spark-submit 并不方便。 提前致谢。


浪费了很多时间后,我发现了问题所在。尽管我没有在我的应用程序中使用 hadoop/hdfs,但 hadoop 客户端很重要。问题出在 hadoop-client 版本中,它与构建 Spark 的 hadoop 版本不同。 Spark 的 hadoop 版本是 1.2.1,但在我的应用程序中是 2.4。

当我在应用程序中将 hadoop 客户端版本更改为 1.2.1 时,我可以在集群上执行 Spark 代码。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

以编程方式提交作业时 Spark EC2 集群上出现 java.io.EOFException 的相关文章

  • Scala 功能设计模式目录

    一周以来我一直在阅读 Scala 编程 作者一步一步地介绍了该语言的元素 但我仍然很困惑何时使用演员 闭包 柯里化等功能性的东西 我正在寻找功能结构的典型用例或最佳实践的目录 我并不是说在 Scala 中重新实现像 GoF 这样的众所周知的
  • 从 pySpark 中的字典构建一行

    我正在尝试在 pySpark 1 6 1 中动态构建一行 然后将其构建到数据帧中 总体思路是扩展结果describe例如 包括偏斜和峰度 这是我认为应该起作用的 from pyspark sql import Row row dict C0
  • 如何设置 jacoco4sbt 来处理 Play 中主模块和子模块中的类?

    我有一些问题要解决雅可可4sbt https github com sbt jacoco4sbt正在使用我的 Play 2 3 4 项目 我的项目由 3 个子模块组成 common api and frontend并且没有代码app根文件夹
  • 创建自定义 scala 集合,其中映射默认返回自定义集合?

    特质TraversableLike A Repr 允许人们在其中进行收藏some函数将返回一个Repr 而其他人则继续返回类型参数That在功能上 有没有办法定义一个CustomCollection A 其中函数如map 其他的默认That
  • 无法在 Windows 10 中启动 Spark Master

    我是 Spark 新手 我正在尝试手动启动 master 在 Windows 10 中使用 MINGW64 当我这样做时 Downloads spark 1 5 1 bin hadoop2 4 spark 1 5 1 bin hadoop2
  • Apache Spark 何时发生混洗?

    我正在优化 Spark 中的参数 并且想确切地了解 Spark 是如何对数据进行洗牌的 准确地说 我有一个简单的字数统计程序 并且想知道spark shuffle file buffer kb如何影响运行时间 现在 当我将此参数设置得非常高
  • AWS - 如何从 CloudWatch Alarm 重新启动或重新启动 EC2 实例?

    有时我的应用程序会无缘无故地死掉 我可以使用 CloudWatch 和 CPU 使用指标下降来检测到这一点 此时我想重新启动java应用程序或整个EC2实例 有什么建议我怎样才能实现这一目标 Thanks AWS 云观察 https aws
  • Build.scala中%和%%符号含义

    我是新来玩的 Framework 2 1 java版本 并且没有scala经验 我不明白什么是以及什么是 and 在 Build scala 中表示 我用谷歌搜索了它们但找不到它们的含义 在我的 Build scala 文件中 我有 org
  • 相当于 scala 中的 python repr()

    有没有相当于Python的东西reprscala 中的函数 即 您可以给任何 scala 对象提供一个函数 它将生成该对象的字符串表示形式 该对象是有效的 scala 代码 eg val l List Map 1 gt a print re
  • 如何在 Spark 数据帧 groupBy 中执行 count(*)

    我的目的是做相当于基本sql的事情 select shipgrp shipstatus count cnt from shipstatus group by shipgrp shipstatus 我见过的 Spark 数据帧的示例包括其他列
  • AWS EC2 上的 Wordpress - 分配弹性 IP 后损坏

    所以 我安装了 WordPress 并且运行得很好 我可以通过从实例获得的公共 DNS 访问该站点和 wp admin 但是 一旦我创建了弹性 IP 并将其与实例关联 我就无法再访问 wp admin 并且主页样式表和 JavaScript
  • WSClient - 打开的文件太多

    我正在 CentOS 6 上使用 Play Framework 2 4 我的应用程序抛出此异常 java net SocketException Too many open files 我在 Stack Overflow 上搜索了很多主题并
  • 承诺的反面是什么?

    承诺代表将来可能可用 或无法实现 的值 我正在寻找的是一种数据类型 它表示将来可能变得不可用的可用值 可能是由于错误 Promise a b TransitionFromTo
  • 比较 javascript 元素和 scala 变量的 Play 框架 Twirl 模板

    如下面的代码示例所示 我想比较 scala 辅助元素内的 javascript 元素 然而 即使存在元素 abcde 它也始终返回 false 除了使用标签之外 如何获取 scala 辅助元素内的 javascript 值 appSeq S
  • 为什么 Spark 退出并显示 exitCode: 16?

    我将 Spark 2 0 0 与 Hadoop 2 7 一起使用 并使用纱线集群模式 每次 我都会收到以下错误 17 01 04 11 18 04 INFO spark SparkContext Successfully stopped S
  • Map 和 Set 的实际类(不是抽象类,也不是特征类)是什么?

    在 Scala 中 映射和集合文字可以通过以下方式创建 val m Map 1 gt a 以及引用的类型m字面意思都是Map Int String 然而 scala文档表明Map实际上是一个特征 具有需要实现才能实例化的抽象成员 scala
  • 在 AKKA 中,对主管调用 shutdown 是否会停止其监督的所有参与者?

    假设我有一位主管连接了 2 位演员 当我的应用程序关闭时 我想优雅地关闭这些参与者 调用supervisor shutdown 是否会停止所有参与者 还是我仍然需要手动停止我的参与者 gracias 阻止主管 https github co
  • 与文件名中的冒号“:”作斗争

    我有以下代码 用于加载大量 csv gz 并将它们转储到其他文件夹中 并将源文件名作为一列 object DailyMerger extends App def allFiles path File List File val parts
  • 导入 sbt 项目时出错,服务器访问错误,未解决的依赖项

    我正在尝试从 IntelliJ IDE 15 0 2 的 build sbt 中导入我的项目中的库 我不断收到未解决的依赖项错误 我尝试更新不同论坛的设置来解决该问题 但没有任何效果 我尝试过的几件事 使用代理设置更新 sbtconfig
  • 如何将 JVM 选项传递给 SBT 以在运行应用程序或测试用例时使用?

    我想在运行我的应用程序或通过 SBT 对应用程序进行测试时指定 JVM 选项 具体来说 我需要能够为 JVM 提供 Djava security policy 参数 以便加载我的策略并用于测试 我怎样才能用 SBT 做到这一点 With x

随机推荐