甚至我一直在网上查找 Spark 如何从 RDD 计算 DAG 并随后执行任务。
在较高级别上,当在 RDD 上调用任何操作时,Spark 会创建 DAG 并将其提交给 DAG 调度程序。
我们来看看Spark是如何构建DAG的。
在高层次上,有两种转换可以应用于 RDD,即狭义转型和广义转型。广泛的转变基本上会导致阶段边界。
狭义转型- 不需要在分区之间移动数据。例如,地图、过滤器等。
广泛的转变- 需要对数据进行混洗,例如,reduceByKey 等。
让我们举一个例子来计算每个严重级别出现的日志消息数量,
以下是以严重级别开头的日志文件,
INFO I'm Info message
WARN I'm a Warn message
INFO I'm another Info message
并创建以下 scala 代码来提取相同的内容,
val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
.map(words => (words(0), 1))
.reduceByKey{(a,b) => a + b}
此命令序列隐式定义了 RDD 对象(RDD 谱系)的 DAG,稍后调用操作时将使用该 DAG。每个 RDD 都维护一个指向一个或多个父代的指针,以及有关其与父代关系类型的元数据。例如,当我们调用val b = a.map()
在 RDD 上,RDDb
保留对其父级的引用a
,这是一个血统。
为了显示 RDD 的沿袭,Spark 提供了一种调试方法toDebugString()
。例如执行toDebugString()
on the splitedLines
RDD,将输出以下内容:
(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[5] at map at <console>:24 []
| MapPartitionsRDD[4] at map at <console>:23 []
| log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
| log.txt HadoopRDD[0] at textFile at <console>:21 []
第一行(从底部开始)显示输入 RDD。我们通过调用创建了这个 RDDsc.textFile()
。下面是根据给定 RDD 创建的 DAG 图的更直观的视图。
一旦 DAG 构建完成,Spark 调度程序就会创建一个物理执行计划。如上所述,DAG 调度程序将图拆分为多个阶段,阶段是根据转换创建的。狭窄的转换将被分组(管道式)到一个阶段。因此,对于我们的示例,Spark 将创建两个阶段执行,如下所示:
然后,DAG 调度程序会将阶段提交到任务调度程序中。提交的任务数量取决于文本文件中存在的分区数量。 Fox 示例假设我们在此示例中有 4 个分区,那么只要有足够的从属/核心,就会并行创建和提交 4 组任务。下图更详细地说明了这一点:
有关更多详细信息,我建议您观看以下 YouTube 视频,其中 Spark 创建者提供了有关 DAG 以及执行计划和生命周期的深入详细信息。
- 高级 Apache Spark- Sameer Farooqui (Databricks) https://www.youtube.com/watch?v=7ooZ4S7Ay6Y
- 深入了解 Spark 内部结构 - Aaron Davidson (Databricks) https://www.youtube.com/watch?v=dmL0N3qfSc8
- AmpLab Spark 内部结构简介 https://www.youtube.com/watch?v=49Hr5xZyTEA