有人告诉我,我可以使用 Spark 的一个版本构建 Spark 应用程序,只要我使用sbt assembly
为了构建它,我可以在任何 Spark 集群上使用 Spark-Submit 运行它。
因此,我使用 Spark 2.1.1 构建了简单的应用程序。您可以在下面看到我的 build.sbt 文件。比我在我的集群上启动这个:
cd spark-1.6.0-bin-hadoop2.6/bin/
spark-submit --class App --master local[*] /home/oracle/spark_test/db-synchronizer.jar
如您所见,我正在使用 Spark 1.6.0 执行它。
我收到错误:
17/06/08 06:59:20 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]
java.lang.NoSuchMethodError: org.apache.spark.SparkConf.getTimeAsMs(Ljava/lang/String;Ljava/lang/String;)J
at org.apache.spark.streaming.kafka010.KafkaRDD.<init>(KafkaRDD.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:219)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/06/08 06:59:20 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:23 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:26 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:29 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@ac5b61d,BlockManagerId(<driver>, localhost, 26012))]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
... 1 more
17/06/08 06:59:39 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:42 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 2 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:45 WARN AkkaUtils: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))] in 3 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
17/06/08 06:59:48 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(<driver>,[Lscala.Tuple2;@5e4d0345,BlockManagerId(<driver>, localhost, 26012))]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-1309342978]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194)
... 1 more
根据一些阅读,我发现通常会出现错误:java.lang.NoSuchMethodError
连接到不同版本的Spark。这可能是真的,因为我使用的是不同的。但不应该sbt assembly
覆盖那个?请参阅下面的 build.sbt 和 assembly.sbt 文件
构建.sbt
name := "spark-db-synchronizator"
//Versions
version := "1.0.0"
scalaVersion := "2.10.6"
val sparkVersion = "2.1.1"
val sl4jVersion = "1.7.10"
val log4jVersion = "1.2.17"
val scalaTestVersion = "2.2.6"
val scalaLoggingVersion = "3.5.0"
val sparkTestingBaseVersion = "1.6.1_0.3.3"
val jodaTimeVersion = "2.9.6"
val jodaConvertVersion = "1.8.1"
val jsonAssertVersion = "1.2.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.slf4j" % "slf4j-api" % sl4jVersion,
"org.slf4j" % "slf4j-log4j12" % sl4jVersion exclude("log4j", "log4j"),
"log4j" % "log4j" % log4jVersion % "provided",
"org.joda" % "joda-convert" % jodaConvertVersion,
"joda-time" % "joda-time" % jodaTimeVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.holdenkarau" %% "spark-testing-base" % sparkTestingBaseVersion % "test",
"org.skyscreamer" % "jsonassert" % jsonAssertVersion % "test"
)
assemblyJarName in assembly := "db-synchronizer.jar"
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in(Compile, run), runner in(Compile, run))
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run))
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
// Spark does not support parallel tests and requires JVM fork
parallelExecution in Test := false
fork in Test := true
javaOptions in Test ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")
汇编.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")