Spark 任务调度机制

2023-11-19

1.Spark任务提交流程

Spark YARN-Cluster模式下的任务提交流程,如下图所示:

 图YARN-Cluster任务提交流程

下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程:

图Spark任务提交时序图

提交一个Spark应用程序,首先通过Client向ResourceManager请求启动一个Application,同时检查是否有足够的资源满足Application的需求,如果资源条件满足,则准备ApplicationMaster的启动上下文,交给ResourceManager,并循环监控Application状态。

当提交的资源队列中有资源时,ResourceManager会在某个NodeManager上启动ApplicationMaster进程,ApplicationMaster会单独启动Driver后台线程,当Driver启动后,ApplicationMaster会通过本地的RPC连接Driver,并开始向ResourceManager申请Container资源运行Executor进程(一个Executor对应与一个Container),当ResourceManager返回Container资源,ApplicationMaster则在对应的Container上启动Executor。

Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。

当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。

从上述时序图可知,Client只负责提交Application并监控Application的状态。对于Spark的任务调度主要是集中在两个方面: 资源申请和任务分发,其主要是通过ApplicationMaster、Driver以及Executor之间来完成。

2.Spark任务调度概述

当Driver起来后,Driver则会根据用户程序逻辑准备任务,并根据Executor资源情况逐步分发任务。在详细阐述任务调度前,首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念:

  1. Job是以Action方法为界,遇到一个Action方法则触发一个Job;
  2. Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
  3. Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度,总体调度流程如下图所示:

 Spark任务调度概览

Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。有了上述感性的认识后,下面这张图描述了Spark-On-Yarn模式下在任务调度期间,ApplicationMaster、Driver以及Executor内部模块的交互过程:

 模块交互过程

 Job提交和Task拆分

Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。

3.Spark Stage级调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。

 Job提交调用栈

Job由最终的RDD和Action方法封装而成,SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算,如上图紫色流程部分。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据,下面看一个简单的例子WordCount。

 WordCount实例

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程中,RDD-3依赖RDD-2,并且是宽依赖,所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,所以将RDD-0、RDD-1和RDD-2划分到同一个Stage,即ShuffleMapStage中,实际执行的时候,数据记录会一气呵成地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索算法。

一个Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。

4.Spark Task级调度

Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示。

 TaskManager结构

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务

 前面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余量,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大致方法调用流程如下图所示:

 

ask调度流程

        将TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task

​​​​​​​4.1调度策略

前面讲到,TaskScheduler会先把DAGScheduler给过来的TaskSet封装成TaskSetManager扔到任务队列里,然后再从任务队列里按照一定的规则把它们取出来在SchedulerBackend给过来的Executor上运行。这个调度过程实际上还是比较粗粒度的,是面向TaskSetManager的

TaskScheduler是以树的方式来管理任务队列,树中的节点类型为Schdulable,叶子节点为TaskSetManager,非叶子节点为Pool,下图是它们之间的继承关系。

 

任务队列继承关系

TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool,表示树的根节点,是Pool类型。

  1. FIFO调度策略

如果是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保存在一个FIFO队列中。

 

FIFO调度策略内存结构

  1. FAIR调度策略

FAIR调度策略的树结构如下图所示:

FAIR调度策略内存结构

FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。

在FAIR模式中需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetManager都继承了Schedulable特质,因此使用相同的排序算法

排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。

  1. 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行
  2. 如果A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行
  3. 如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行
  4. 如果上述比较均相等,则比较名字。

整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行

FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。

从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

​​​​​​​4.2本地化调度

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用getPreferrdeLocations()得到partition 的优先位置,由于一个partition对应一个task,此partition的优先位置就是task的优先位置对于要提交到TaskScheduler的TaskSet中的每一个task,该task优先位置与其对应的partition对应的优先位置一致

从调度队列中拿到TaskSetManager后,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的所有task,并负责管理调度这些task。

根据每个task的优先位置,确定task的Locality级别,Locality一共有五种,优先级由高到低顺序:

Spark本地化等级

名称

解析

PROCESS_LOCAL

进程本地化,task和数据在同一个Executor中,性能最好。

NODE_LOCAL

节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。

RACK_LOCAL

机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。

NO_PREF

对于task来说,从哪里获取都一样,没有好坏之分。

ANY

task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。

在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。

可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能。

​​​​​​​4.3失败重试与黑名单机制

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败

 模块交互过程

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 任务调度机制 的相关文章

随机推荐

  • u盘安装ubuntu问题:卡在引导界面不动

    问题 一直卡在如图界面不动 分析 既然一直提示syslinux 那我们就看看他是什么东西吧 原因 syslinux分区引导记录问题 解决方案1 安装bootice软件 将制作好的启动盘插入电脑 用bootice更改syslinux引导记录
  • 8.10:如何在Python中判断文件类型?

    在计算机科学领域中 文件类型判断是一个非常基础和重要的问题 不同类型的文件需要采取不同的处理方式 因此在处理文件时 我们需要准确地判断文件类型 Python作为一门流行的编程语言 提供了许多方法来判断文件类型 在本文中 我们将介绍几种常见的
  • @vitejsplugin-vue requires vue (>=3.2.13) or @vuecompiler-sfc to be present in the dependency tree

    运行项目的时候 首先会提示要安装 vue compiler sfc 但是安装后运行项目成功但是页面是空白并且报错 VUE HMR RUNTIME is not defined 摸索了半天 查看到package json依赖文件 没有vue
  • RTX线程通信之——线程标志

    文章目录 Thread Flags 概念 RTX线程标志API 案例 LED灯同步闪亮 小结 参考资料 Thread Flags In a real application we need to be able to communicate
  • mbedtls 入门第四课--移植mbedtls到VS和ESP8266--8266SDK SHA256移植

    承接上篇 我们初步了解了mbedtls的文件路径以及文件作用以后就是想着如何将mbedtls移植到各种平台 博主这里只有两种移植方法 第一是将代码移植到VS中 第二个是将代码移植到博主跑动的比较多的小众SOC ESP8266 移植到ESP8
  • 【华为OD机试】五子棋迷(C++ Python Java)2023 B卷

    时间限制 C C 1秒 其他语言 2秒 空间限制 C C 262144K 其他语言524288K 64bit IO Format lld 题目描述 张兵和王武是五子棋迷 工作之余经常切磋棋艺 这不 这会儿又下起来了 走了一会儿 轮张兵了 对
  • 技术积累 — Keil 查看内存占用/优化代码

    原文链接 转自Sugar的专栏 转载文章 若有不妥 通知后我会立即删除 一 查看内存占用 1 使用Keil编辑代码时 编译成功后 双击红色框框位置 就会弹出 map文件 2 那么map文件中能够读出哪些信息呢 Program Size Co
  • caffe中lstm的实现以及lstmlayer的理解

    本文地址 http blog csdn net mounty fsc article details 53114698 本文内容 本文描述了Caffe中实现LSTM网络的思路以及LSTM网络层的接口使用方法 本文描述了论文 Long ter
  • 自学软件测试需要多久?怎么自学软件测试?自学软件测试可以找到工作吗? 绝对干货!

    一 前言 最近经常有很多朋友问我想要入行软件测试 但是都不知道该怎么学 这里详细的给大家说下 对于0基础的朋友 应该怎么去学习软件测试 学习软件测试有2条路可以选 1 找个靠谱的培训机构去培训啦 你就什么都不用想了 跟着培训结构认真的学习就
  • Hive Sql执行出错 Dag submit failed due to java.io.IOException: All datanodes DatanodeInfoWithStorage

    原因 根本原因是集群中的一个或多个信息块在所有节点中都已损坏 因此映射无法获取数据 命令 hdfs fsck list corruptfileblocks 可用于识别集群中损坏的块 当数据节点中打开的文件数量较少时 也会出现此问题 解决方案
  • 微信小程序传递数组给服务器,微信小程序页面间的数组如何传递

    A页面 数组 对象都需要stringify var listData JSON stringify that data listData var taskArray JSON stringify that data taskArray wx
  • visual studio2019(C#/.NET)安装教程

    前言 好久没有跟新版本了 博主还用的2017 看到最新的2019功能还是很强大的 版本可能越高越好 所以博主写了一个详细的博客 希望可以帮助到大家 一 visual studio 2019 下载 1 下载地址 visual studio官方
  • 爬取药品监督情况数据

    首先打开国家药品监督局的相应网址 国家药品监督局的相应网址 找到某一家企业点击相应的许可证编号那一个栏目 查看相应的许可证情况 上面对应的内容为我们需要爬取的对应的数据 不确定对上述的网页进行访问的时候 我们能够得到对应的企业名称 许可证编
  • apollo5.5感知模块改进

    apollo5 5感知模块改进 最近一直在研究百度的apollo源码感知部分 下面整理一下近期的一些知识点 apollo5 5在感知部分做了些升级 以适应最新的apollo自动驾驶套件 主要的特征及改进 1 全新的数据Pipline服务以及
  • 射发射整改案例

    文章摘录 http www elecfans com emc emi 1244893 html 试验现象 系统在300K 200MHz辐射发射超标 2 诊断过程 1 使用近场探头置于模块显示屏时 1MHz以下频段超标 则低频电磁干扰来源于显
  • pandas 行、列的增删改查

    读取tips xlsx及预览内容 行的增删改查 增加行 直接赋值 可以只写一个值 也可以写列表 df loc 40 1 df loc 40 1 2 3 4 5 6 7 append方法 append 增加行的方法 设置ignore inde
  • 数据结构--图的遍历(广度优先遍历、深度优先遍历)

    目录 图的遍历 广度优先遍历 BFS 广度优先遍历的代码实现 编辑 广度优先遍历序列 编辑 遍历序列的可变性 编辑 BFS算法完整版 编辑 广度优先遍历复杂度分析 广度优先生成树 广度优先生成森林 回顾广度优先遍历 深度优先遍历 DFS 回
  • Opencv项目实战:00 专栏内容介绍

    目录 Opencv项目实战专栏介绍 01 文字检测OCR 02 角度探测器 03 扫描二维码 条形码 04 全景图片拼接 05 物体检测 06 文档扫描仪 07 人脸识别和考勤系统 08 Yolov3更高精度的检测物体 09 物体尺寸测量
  • react-dnd 拖拽能力教程

    前言 近几年来 低代码 零代码的热度在国内逐年递增 复杂度同力一样不会消失 也不会凭空产生 它总是从一个物体转移到另一个物体或一种形式转为另一种形式 用户在使用低零代码构建应用程序时 这些能力只是被平台研发人员提前编写完了 作为低零代码的基
  • Spark 任务调度机制

    1 Spark任务提交流程 Spark YARN Cluster模式下的任务提交流程 如下图所示 图YARN Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程 图Spark任务提交时序图 提交