我正在开发 Spark SQL 程序,但收到以下异常:
16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at scala.Option.getOrElse(Option.scala:121)
at com.somecompany.ml.Main$.main(Main.scala:46)
at com.somecompany.ml.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])
我从堆栈跟踪中识别出的代码的最后一部分是com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
这让我想到了这一行:profilesDF.cache()
在缓存之前,我在两个数据帧之间执行并集。我看到了关于在连接之前保留两个数据帧的答案here https://stackoverflow.com/questions/36290486/spark-job-restarted-after-showing-all-jobs-completed-and-then-fails-timeoutexce我仍然需要缓存联合数据帧,因为我在几个转换中使用它
我想知道什么可能导致抛出此异常?
搜索它让我找到了一个处理 rpc 超时异常或一些安全问题的链接,这不是我的问题
如果您对如何解决它有任何想法,我显然会很感激,但即使只是理解问题也会帮助我解决它
提前致谢
问题:我想知道什么可能导致抛出此异常?
Answer :
spark.sql.broadcastTimeout300 广播超时(以秒为单位)
广播连接中的等待时间 http://spark.apache.org/docs/latest/sql-programming-guide.html
spark.network.timeout
120 秒 所有网络交互的默认超时。spark.network.timeout (spark.rpc.askTimeout)
, spark.sql.broadcastTimeout
,
spark.kryoserializer.buffer.max
(如果您使用的是 kryo
序列化)等调整为大于默认值
为了处理复杂的查询。您可以从这些值开始,然后
根据您的 SQL 工作负载进行相应调整。
Note : 医生说 http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
以下选项(参见spark.sql。属性)也可用于调整查询执行的性能。随着更多优化的自动执行,这些选项可能会在未来版本中被弃用。*
另外,为了您更好地理解,您可以看到广播散列连接 https://github.com/jiadongy/Spark.SkewAdaptive/blob/a3429c5547153dbc2320456fc0df4edecf91d762/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala其中执行方法是上述堆栈跟踪的触发点。
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)