Spark 驱动程序不会因异常而崩溃

2024-03-29

我们在 Kubernetes 上以客户端模式运行 Spark 3.1.1。

我们是一个简单的 scala Spark 应用程序,它从 S3 加载 parquet 文件并聚合它们:

sparkSession.read.parquet(paths).as[MyRawEvent]

我们的应用程序在快乐路径上完美运行:驱动程序 Pod 开始运行,执行程序 Pod 加入,当应用程序完成时,执行程序和驱动程序都会终止。

另一方面,如果出现问题,驱动程序 + 执行程序 Pod 都会保持开启状态Running状态。例如,如果以下之一发生异常(在驱动程序中):paths上面不存在:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://<bucket-name>/client-id=8765432/date=2021-08-06
     at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
     at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
     at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
     at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
     at scala.util.Success.$anonfun$map$1(Try.scala:255)
     at scala.util.Success.map(Try.scala:213)
     at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
     at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
     at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
     at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
     at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

有趣的是,这个异常并不会阻止执行程序立即启动,并且驱动程序和执行程序 Pod 都会永远卡住,什么都不做。

我们没有在应用程序中捕获异常,并且我们期望驱动程序和执行程序将停止,而不是浪费冗余资源。

我们怎样才能粉碎应用程序,使它不会留在里面Running状态永远?


嗯,这很简单。

我必须捕获所有异常,以确保无论如何都关闭 Spark 上下文:

  def main(args: Array[String]): Unit = {
    // some code
    implicit val sparkSession = SparkSession.builder().getOrCreate
    try {
      // application code with potential exceptions
    } catch {
      case exception: Exception =>
        sparkSession.close()
        throw exception
    }

    sparkSession.close()
  }

这样所有资源都被释放,并且驱动程序 pod 的状态更改为Error作为例外。

EDIT- 以 Scala 方式进行:

  def main(args: Array[String]): Unit = {
    // some code
    implicit val sparkSession = SparkSession.builder().getOrCreate
    Try {
      // application code with potential exceptions
    } match {
      case Success(_) => None
      case Failure(exception) =>
        sparkSession.close()
        throw exception
    }

    sparkSession.close()
  }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 驱动程序不会因异常而崩溃 的相关文章

随机推荐

  • 为什么要对授权标头进行 Base64 编码?

    Twitter 的 API 需要发送一个授权标头 该标头是 API 密钥与 API 密钥连接的 Base64 编码 在节点中 我使用 var base64 new Buffer apiKey apiSecret toString base6
  • 发送被 GetAsyncKeyState() 忽略的虚拟鼠标点击?

    我一整天都在尝试和搜索 但我一生都无法弄清楚如何做到这一点 正如标题所示 我希望能够发送被 GetAsyncKeyState 之类的东西忽略的鼠标点击 基本上我在做什么 While physically holding left mouse
  • 更改 django 设置后 uwsgi 不会重新加载

    我已经设置了 uwsgi 在 nginx 后面为 django 提供服务 然后我在 django 设置中更改数据库 但 uwsgi 仍然显示具有旧数据库的站点 我也尝试过这个建议 https serverfault com a 411363
  • 将内联 SVG 转换为 png 时出现样式错误

    我的高级目标是转变 div 包含一些内联 svg 图像到 png 文件的元素 所有操作都必须使用 JavaScript 在客户端浏览器中执行 我努力了 使用 canvg 库并按照本文中的步骤操作 https github com nikla
  • 如何使用 Python 的 Bokeh 向日期时间轴添加更多 x 轴刻度和标签?

    我一直在测试 Python 的 Bokeh 特别是烛台图表工具 但一直无法弄清楚如何向我的图表添加超过 5 个日期时间标签 刻度 任何见解将不胜感激 这是代码 from math import pi import pandas as pd
  • 防止在 Xcode 中使用 iOS iPhone 应用程序部署(禁用)WatchKit 应用程序

    我们还有一个 Xcode 项目 其中包含 Apple Watch 的构建配置 我们的 Apple Watch 应用尚未准备就绪 因此我们希望在禁用它的情况下发布 在构建中禁用 Apple Watch 功能的最佳方法是什么 删除 WatchK
  • 为什么所有位置/大小都是双精度格式?

    基本上 我会说坐标是 基于像素的 很容易理解 10 10 处的像素或者100像素的宽度 但由于位置和大小采用双精度格式 因此大小可以为 100 6 位置可以为 10 1 50 9 小数值对位置 大小有影响吗 事实上 我通过计算点来生成自定义
  • 如何通过联系表 7 获取帖子标题

    我正在网站中使用联系表 7 获取REQUEST QUOTE每个产品 链接到示例 http rocketuk cgtmarketing com led light components 当访客提交REQUEST QUOTE然后我收到一封带有脱
  • 用于优化目的的简单编译器

    我想要一个简单编译器的源代码 通过交换延迟分支的代码来优化我的作业 我读了有没有针对小语言的简单编译器 https stackoverflow com questions 1913621 is there a simple compiler
  • Oracle DELETE sql 与 JOIN 不起作用

    我的删除语句在 Oracle 中返回 933 错误 我不确定出了什么问题 DELETE b from temp a JOIN fact tab b on a col1 b col1 and a col2 b col2 and a col3
  • 将 DLL 添加到资产文件夹时,无法运行引用重写器,出现命令错误和统一错误

    当我将 DAI dll 添加到资产文件夹时 我遇到了此错误 当我尝试构建时会发生此错误 UnityException 无法使用命令运行引用重写器 target Temp StagingArea DAI dll additionalrefer
  • 将 Objective-C 对象序列化和反序列化为 JSON

    我需要将 Objective C 对象序列化和反序列化为 JSON 以存储在 CouchDB 中 人们是否有通用解决方案最佳实践的示例代码 我查看了一些 JSON 框架 它们都停留在 NSDictionary NSArray 级别 即很多框
  • 防止 React Native 中的双击

    如何防止用户在 React Native 中点击按钮两次 即用户不能在可触摸的突出显示上快速点击两次 https snack expo io patwoz withpreventdoubleclick https snack expo io
  • 以编程方式或声明方式要求 IIS 中单个 asp.net 页面的客户端证书

    标题几乎说明了这一点 我已经推出了一个带有 SSL 证书的 IIS 7 网站 现在愿意为单个页面设置 SSL 设置 客户端证书 接受 但是以编程方式或声明方式 我找到了一种使用 IIS 管理器执行此操作的方法 但由于某些基础结构限制 我们需
  • 具有 beginwait 函数的信号量

    我正在使用 begin end 编写一个异步库 并且需要锁定对象 目前 我正在使用信号量执行此操作 但调用semaphore WaitOne 在调用该线程的地方挂起该线程 我宁愿使用像 BeginWait 这样的东西 这样它会立即返回并在信
  • Python使用sudo启动时找不到模块

    我有一个使用 Google Assistant 库的脚本 并且必须从那里导入一些模块 我发现这只适用于 Python 虚拟环境 这真的很奇怪 在同一个文件夹中 我有一个使用 GPIO 引脚并且必须使用 root 的脚本 它们相互交互 因此当
  • 文本区域值未随表单一起发布

    我在提交表单时尝试输入文本区域标记
  • 使用 CXF Web 服务进行服务器端 XML 验证

    我正在开发 Apache CXF Web 服务 使用 JAX WS 通过 SOAP 该服务本身非常简单 接收请求 将请求插入数据库 然后返回插入是否成功 我想依靠 XML 验证来对请求实施一些约束 那么 我的问题 如何向我的服务客户返回详细
  • 如何拒绝所有用户删除表

    在 SQL Server 2005 中 有没有一种方法可以使用单个语句来拒绝删除行 在数据库所有用户的特定表中 尝试这个 CREATE TRIGGER yourTriggerName ON YourTableName INSTEAD OF
  • Spark 驱动程序不会因异常而崩溃

    我们在 Kubernetes 上以客户端模式运行 Spark 3 1 1 我们是一个简单的 scala Spark 应用程序 它从 S3 加载 parquet 文件并聚合它们 sparkSession read parquet paths