Spark技术内幕读书笔记:Spark核心——DAGScheduler任务切分调度与TaskScheduler任务执行调度计算过程详解

2023-05-16

————Spark技术内幕读书笔记————

本书的三个核心:

  • RDD实现详解
  • Scheduler:DAGScheduler任务切分调度与TaskScheduler任务执行调度计算过程详解
  • 性能调优详解

 

 

Scheduler任务调度充分体现了与MapReduce完全不同的设计思想。

MapReduce把计算拆分为Map阶段与Reduce阶段,一个Job两阶段完成就完成了,采用的是分治思想,对于复杂的计算时可能需要多次串联多个MR生成多个Job来完成。

而Spark则不同,采用的是内存迭代式计算,也就是一个Job它可以分为n个阶段(Stage),Stage划分的依据是Shuffle,无论多复杂的计算大部分都可以在多个Job中完成所有的Stage,且Shuffle时除非内存不足时才会Spill写磁盘,否则子Stage的RDD会根据Shuffle的算法从父Stage的Partition中拿到属于自己Partition的数据,生成新的RDD,所有迭代计算都是在内存中完成,效率就比较高。

调度系统——上面基于内存的迭代式计算主要就依赖于DAGScheduler与TaskScheduler来分配计算资源!

Spark对于DAG与Task的实现以及不同执行阶段的划分和任务的提交执行,充分体现了其设计的优雅与高效!

 

spark集群中Driver、Cluster Manager、Worker、Executor之间在的关系图:

————第四章 Scheduler模块详解————

 

任务调度整体架构

将用户提交的计算任务按照DAG划分不同的阶段的计算任务提交到集群进行最终的计算

RDD Objects:从RDDS中构建DAG。实际代码中创建的RDD,是DAG的基本元素,Spark提供了丰富的Transformation和Action算子来对它进行操作。

DAGScheduler:对DAG进行任务切分,生成TaskSet。分析用户提交的应用、并根据计算任务的依赖关系建立 DAG,然后将DAG划分为不同的Stage阶段,其中每个Stage由可以并发进行执行的一组Task任务构成,所有Task的执行逻辑完全相同,只是处理的数据不一样。DAG无论在任何部署方式下都是相同的。在DAGScheduler将Task任务划分完成后,将Task组提交给TaskScheduler。

TaskScheduler:启动任务并调度任务(就近原则)。收到任务集后,TaskScheduler通过Cluster Manager在集群的某个Worker的Executor上启动任务。任务完成后将结果保存到指定地方或回传到Driver。

TaskScheduler为创建它的SparkContext调度任务,从DAGScheduler接收不同Stage的任务,并向集群提交这些任务,并为执行得特别慢的任务启动备份任务。

TaskScheduler的具体实现交给TaskSchedulerImpl实现。

TaskSchedulerImpl会在以下4种情况调用SchedulerBackend的reviveOffers为其分配计算资源:

  1. 有新任务提交时
  2. 有任务执行失败时
  3. 计算节点Executor不可用时
  4. 某些任务运行过慢需要为其重新分配资源时。

Executor:任务执行

最核心的三个类:

  • org.apache.spark.scheduler.DAGScheduler
  • org.apache.spark.scheduler.SchedulerBackend
  • org.apache.spark.scheduler.TaskScheduler

 

SchedulerBackend:分配当前可用计算资源

为Task分配Executor计算资源,并在所分配的Executor上启动Task,完成计算的调度过程。reviveOffers是具体实现的方法。

每个SchedulerBackend都会对应一个唯一的TaskScheduler,它们被SparkContext创建和持有。

 

 

Scheduler的两个阶段详解


 

一、DAGScheduler阶段

  1. 从调用sparkContext.runJob()来提交job开始

  2. sc.runJob再调用DAGScheduler中的runJob()

  3. DAGScheduler开始处理用户提交的Job:

    • 根据DAG中的依赖关系图划分Stage链图,向TaskScheduler提交Task

    • 取出需要计算的Partition,为每个Partition生成相应的Task,并将它放到TaskSet集中,每个Task负责处理一个Partition,按照它的关系顺序执行

    • 等待TaskScheduler最终向集群提交TaskSet,并监听这些Task的状态。

DAGScheduler可以简单的理解为计算前的准备阶段,分析用户的RDD Objects生成优化后的DAG图,为下一阶段的TaskScheduler做准备。

 

DAG调度图

 

 

stage划分算法

stage划分算法非常重要,精通spark,必须对stage划分算法很清晰,知道自己编写的spark程序被划分为几个job,每个job被划分为几个stage,每个stage包含了哪些代码,只有知道每个stage包括哪些代码后。在线上,如果发现某个stage执行特别慢,或者某个stage一直报错,才能针对特定的stage包含的代码排查问题,或性能调优。

stage划分算法总结:

1.从finalstage倒推(通过 栈 数据结构实现)

2.通过宽依赖,进行stage的划分

3.通过递归,优先提交父stage

/**
* 获取某个stage的父stage
* 对于一个stage,如果它的最后一个RDD的所有依赖都是窄依赖,将不会创建新的stage
* 如果其RDD会依赖某个RDD,用宽依赖的RDD创建一个新的stage,并立即返回这个stage
* @type {[type]}
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
            //遍历RDD的依赖,对于每种具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底层对应了3个RDD:
            //Map
          for (dep <- rdd.dependencies) {
            dep match {
                //如果是宽依赖
              case shufDep: ShuffleDependency[_, _, _] =>
                  //使用宽依赖的RDD创建一个 ShuffleMapStage,并且将isShuffleMap 设置为true,
                  //默认最后一个stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              
                //如果是窄依赖
              case narrowDep: NarrowDependency[_] =>
              //将依赖的RDD放入栈中
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    //
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
    //
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

 

 

 


 

 

二、TaskScheduler阶段:任务调度过程

 

每个Task Scheduler都对应一个SchedulerBackend。也就是最终交给SchedulerBackend去执行相应的任务。SchedulerBackend直接与Cluster Manager进行交互,取得用户Application分配到的资源,并将执行资源交给Task scheduler去进行分配。

TaskScheduler任务职责:

  • 负责各个用户Application的不同Job之间进行任务调度
  • 对Task执行失败或超时情况下启动重试机制
  • 对执行慢的Task启动备用任务,若超时会交给其它新的Executor执行此Task

  1. 收到DAGScheduler传来的TaskSet后,启动TaskScheduler.submitTasks开始对Task进行资源调度。并交给SchedulerBackend去执行相应的任务。SchedulerBackend直接与Cluster Manager进行交互,取得用户Application分配到的资源,并将执行资源交给Task scheduler去进行分配。
  2. 为Task分配Executor:其中1-6是在Driver端完成,7-8是在Executor上完成

步骤1:TaskScheduler.submitTasks   将保存这组任务的TaskSet加入到一个TaskSetManager中

val manger = new TaskSetManager(this, taskSet, maxTaskFailures)

TaskSetManager任务:

A、按就近原则为Task分配计算资源:

       即在DAGScheduler获取Stage结果时,处理ShuffleDependencies时就利用ShuffleMapTask调用registerMapOutputs来获取到Driver端进行任务提交时数据的所在位置、大小等元数据信息

B、监控Task的执行状态并采取相应的方法:失败重试、慢任务的推测性执行或启动新的Executor去执行

步骤2:确定每个Application的高度策略并确定Task运行的Executor

schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

  • schedulableBuilder是Application级别的调度器,支持两种调度策略:FIFO(先进先出,first in first out)和FAIR(公平调度)

           调度策略可以在spark.scheduler.mode中进行设置。新版的已经使用FAIR默认。

  • schedulableBuilder还会确定TaskSetManager的调度顺序
  • 由TaskSetManager根据就近原则分配Task运行所在的Executor

步骤3-4:reviveOffers()请求资源、调用makeOffer()

通过makeOffer()得到一批可执行的任务描述, 调用 launchTasks。

步骤5:resourceOffers()为每个Task具体分配资源

响应CoarseGrainedSchedulerBackend的资源调度请求,为每个Task分配资源。接受一个Executor列表,输出一个org.apache.spark.scheduler.TaskDescription的二维数组。里面包括了Task ID、Executor ID和Task执行环境的依赖信息等。

步骤6:将上个调用得到的tasks发送到Executor上执行。

通过makeOffer()得到一批可执行的任务描述, 调用 launchTasks。剩下的步骤就是在执行Task了。

 

 


三、任务调度的两种策略

 

先进先出FIFOSchedulingAlgorithm算法

首先保证Job ID较小的先被调度,如果是同一个Job,那么Stage ID小的先被调度(同一个Job,可能是多个Stage并行执行。)

FIFO算法代码与分析

在这个类里面,主要逻辑是一个comparator方法。

  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority   //实际上是Job ID
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) { //如果Job ID相同,就比较Stage ID
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }

如果有两个调度任务s1和s2,首先获得两个任务的priority,在FIFO中该优先级实际上是Job ID。首先比较两个任务的Job ID,如果priority1比priority2小,那么返回true,表示s1的优先级比s2的高。我们知道Job ID是顺序生成的,先生成的Job ID比较小,所以先提交的job肯定比后提交的job先执行。但是如果是同一个job的不同任务,接下来就比较各自的Stage ID,类似于比较Job ID,Stage ID小的优先级高。

 

公平调度FairSchedulingAlgorithm算法

通过xml配置文件的策略来调度

spark后再conf目录下有一个fairscheduler.xml.template文件,把此文件复制一份:

#cp fairscheduler.xml.template    fairscheduler.xml

#cat fairscheduler.xml



<?xml version="1.0"?>

<!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-->


<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>


 

(1)name: 该调度池的名称,可根据该参数使用指定pool,入sc.setLocalProperty("spark.scheduler.pool", "test") 
(2)weight: 该调度池的权重,各调度池根据该参数分配系统资源。每个调度池得到的资源数为weight / sum(weight),weight为2的分配到的资源为weight为1的两倍。 
(3)minShare: 该调度池需要的最小资源数(CPU核数)。fair调度器首先会尝试为每个调度池分配最少minShare资源,然后剩余资源才会按照weight大小继续分配。 
(4)schedulingMode: 该调度池内的调度模式。

 

修改完fairscheduler.xml文件,还需要配置spark-default.conf,添加如下内容:

#cat spark-default.conf

spark.scheduler.mode  FAIR
spark.scheduler.allocation.file /data/spark-2.2.0-bin-hadoop2.7/conf/fairscheduler.xml

可以在fairscheduler.xml文件中添加多个调度池,配置不同的weight、minShare来控制,使用调度池要显示指定:

SET spark.sql.thriftserver.scheduler.pool=default;

或在代码中指定:

sc.setLocalProperty("spark.scheduler.pool", "test") 

公平算法代码

comparator方法

  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare //在这里share理解成份额,即每个调度池要求的最少cpu核数
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks // 该Pool或者TaskSetManager中正在运行的任务数
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1 // 如果正在运行任务数比该调度池最小cpu核数要小
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0
 
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }
 
    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }

 

两种调度的图示

 


 

一个job的完整调用过程

dagScheduler.runJob //(1)
--> submitJob ( eventProcessLoop.post(JobSubmitted,***) //(2)
    --> eventProcessLoop //(3)
        --> onReceive(event: DAGSchedulerEvent) //(4)
            --> doOnReceive(event: DAGSchedulerEvent) //(5)
                --> case JobSubmitted //(6)
                    --> dagScheduler.handleJobSubmitted //(7)
                        --> finalStage =createResultStage(finalRDD, func, partitions, jobId, callSite) //(8)    
                        --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9)
                        --> jobIdToActiveJob(jobId) = job //(10)
                        --> activeJobs += job //(11)
                        --> finalStage.setActiveJob(job) //(12)
                        --> stageIds = jobIdToStageIds(jobId).toArray //(13)
                        --> stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //(14)
                        --> listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //(15)
                        --> submitStage(finalStage) //(16)
                            --> getMissingParentStages(stage).sortBy(_.id) //(17)
                                --> finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18)
                                    --> createShuffleMapStage(dep, firstJobId) //(19)
                                        -->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
                                --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20)                    
                                --> submitStage(finalStage)  //(21)//划分和提交stage算法精髓
                                    --> submitMissingTasks(stage, jobId.get) //(22)
                                        --> submitWaitingChildStages(stage) //(23)
                                --> markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))  //(24)

(1)所有的action算子都会触发一个job的调度,经过多次不同的runjob重载后停在这里调度 submitJob

(2)调用eventProcessLoop方法,并发送 JobSubmitted 消息给DAGSchedulerEventProcessLoop(DAGScheduler的循环响应函数体)

(3)eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

(4)onReceive 函数是接受 DAGSchedulerEventProcessLoop DAG调度程序的事件接受函数

(5)doOnReceive 实际是步骤4的事件处理函数

(6)根据步骤2的发送事件,触发 JobSubmitted 这个事件响应

(7)dagScheduler 的核心入口

(8)使用触发的job的最后一个RDD创建一个 finalstage,并且放入内存缓存中 stageIdToStage

(9)使用 finalStage 创建一个job。这个job最后一个stage就是final stage

(10)(11)(12)(13)(14)(15)把 job 加入各种内存缓存中,其实就是各个数据结构

(16)提交finalStage。总是从最后开始往前推测。

(17)获取当前stage的父stage。stage的划分算法,主要在这里。waitingForVisit = new Stack[RDD[_]]。栈结构,从最后的stage往前的stage 放进栈中,实现先进后出。符合程序调用顺序。

(18)获取最后一个stage,finalstage

(19)生成一个 ShuffleMapStage

(20)利用finalestage 生成一个job

(21)划分和提交stage算法精髓,划分好stage之后全部放在waiting stage 数据结构中

(22)提交所有在 waiting stage 中的stage,从stage0...finalstage

(23)检查等待的阶段,现在有资格重新提交。提交依赖于给定父级阶段的阶段。当父阶段完成时调用成功

(24)所有的stage划分完并提交结束

 

 

 


 

 

参考:

《Spark技术内幕》

Spark调度模式-FIFO和FAIR

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark技术内幕读书笔记:Spark核心——DAGScheduler任务切分调度与TaskScheduler任务执行调度计算过程详解 的相关文章

随机推荐