Apache Spark:如何取消代码中的作业并终止正在运行的任务?

2024-01-08

我正在 Hadoop 集群上运行 Spark 应用程序(版本 1.6.0),并在客户端模式下使用 Yarn(版本 2.6.0)。我有一段运行长时间计算的代码,如果它花费的时间太长,我想杀死它(然后运行一些其他函数)。
这是一个例子:

val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// setting up an infite action
val future = sc.parallelize(lst).map(while (true) _).collectAsync()

try {
    Await.result(future, Duration(30, TimeUnit.SECONDS))
    println("success!")
} catch {
    case _:Throwable =>
        future.cancel()
        println("timeout")
}

// sleep for 1 hour to allow inspecting the application in yarn
Thread.sleep(60*60*1000)
sc.stop()

超时设置为 30 秒,但当然计算是无限的,因此等待 future 的结果将抛出异常,该异常将被捕获,然后 future 将被取消并执行备份函数。
这一切都运行得很好,只是取消的作业没有完全终止:当查看应用程序的 Web UI 时,作业被标记为失败,但我可以看到内部仍然有正在运行的任务。

当我使用 SparkContext.cancelAllJobs 或 SparkContext.cancelJobGroup 时,也会发生同样的情况。问题是,即使我设法继续执行我的程序,已取消作业的正在运行的任务仍然占用宝贵的资源(这最终会让我的速度几乎停止)。

总而言之:如何以同时终止该作业的所有正在运行的任务的方式终止 Spark 作业? (与现在发生的情况相反,即停止作业运行新任务,但让当前正在运行的任务完成)

UPDATE:
在忽视这个问题很长时间之后,我们找到了一个凌乱但有效的小解决方法。我们没有尝试从 Spark 应用程序中终止相应的 Spark 作业/阶段,而是只是在超时发生时记录所有活动阶段的阶段 ID,并向用于终止的 Spark Web UI 提供的 URL 发出 HTTP GET 请求说阶段。


我不知道这回答了你的问题。 我的需要是终止挂起时间过长的作业(我的作业从 Oracle 表中提取数据,但由于某些未知的原因,连接很少会永远挂起)。

经过一番研究,我得出了这个解决方案:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.JobExecutionStatus

val MAX_JOB_SECONDS = 100
val statusTracker = sc.statusTracker;
val sparkListener = new SparkListener()  
{ 
    
    override def onJobStart(jobStart : SparkListenerJobStart)     
    {
        val jobId = jobStart.jobId
        val f = Future 
        {
            var c = MAX_JOB_SECONDS;
            var mustCancel = false;
            var running = true;
            while(!mustCancel && running)
            {
                Thread.sleep(1000);
                c = c - 1;
                mustCancel = c <= 0;
                val jobInfo = statusTracker.getJobInfo(jobId);
                if(jobInfo!=null)
                {
                    val v = jobInfo.get.status()
                    running = v == JobExecutionStatus.RUNNING
                }
                else
                    running = false;
            }
            if(mustCancel)
            {
              sc.cancelJob(jobId)
            }
        }
    }
}
sc.addSparkListener(sparkListener)
try
{
    val df = spark.sql("SELECT * FROM VERY_BIG_TABLE") //just an example of long-running-job
    println(df.count)
}
catch
{
    case exc: org.apache.spark.SparkException =>
    {
        if(exc.getMessage.contains("cancelled"))
            throw new Exception("Job forcibly cancelled")
        else
            throw exc
    }
    case ex : Throwable => 
    {
        println(s"Another exception: $ex")
    }
}
finally
{
    sc.removeSparkListener(sparkListener)
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark:如何取消代码中的作业并终止正在运行的任务? 的相关文章

随机推荐