我们在 Kubernetes 上以客户端模式运行 Spark 3.1.1。
我们是一个简单的 scala Spark 应用程序,它从 S3 加载 parquet 文件并聚合它们:
sparkSession.read.parquet(paths).as[MyRawEvent]
我们的应用程序在快乐路径上完美运行:驱动程序 Pod 开始运行,执行程序 Pod 加入,当应用程序完成时,执行程序和驱动程序都会终止。
另一方面,如果出现问题,驱动程序 + 执行程序 Pod 都会保持开启状态Running
状态。例如,如果以下之一发生异常(在驱动程序中):paths
上面不存在:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
有趣的是,这个异常并不会阻止执行程序立即启动,并且驱动程序和执行程序 Pod 都会永远卡住,什么都不做。
我们没有在应用程序中捕获异常,并且我们期望驱动程序和执行程序将停止,而不是浪费冗余资源。
我们怎样才能粉碎应用程序,使它不会留在里面Running
状态永远?
嗯,这很简单。
我必须捕获所有异常,以确保无论如何都关闭 Spark 上下文:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
try {
// application code with potential exceptions
} catch {
case exception: Exception =>
sparkSession.close()
throw exception
}
sparkSession.close()
}
这样所有资源都被释放,并且驱动程序 pod 的状态更改为Error
作为例外。
EDIT- 以 Scala 方式进行:
def main(args: Array[String]): Unit = {
// some code
implicit val sparkSession = SparkSession.builder().getOrCreate
Try {
// application code with potential exceptions
} match {
case Success(_) => None
case Failure(exception) =>
sparkSession.close()
throw exception
}
sparkSession.close()
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)