高性能Spark作业调优

2023-11-02

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。在美团点评,已经有很多同学在各种项目中尝试使用Spark。大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高。

然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。

Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。

笔者根据之前的Spark作业开发经验以及实践积累,总结出了一套Spark作业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分:

  • 开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;

  • 数据倾斜调优,主要讲解了一套完整的用来解决Spark作业数据倾斜的解决方案;

  • shuffle调优,面向的是对Spark的原理有较深层次掌握和研究的同学,主要讲解了如何对Spark作业的shuffle运行过程以及细节进行调优。

本文主要讲解开发调优和资源调优。

一、开发调优

Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。而开发调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。

在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。

原则一:避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD,以此类推,循环往复,直到计算出最终我们需要的结果。

在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

一个简单的例子

//也就是说,需要对一份数据执行两次算子操作。

//错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。

//这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,

//然后分别对每个RDD都执行了一个算子操作。

//这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;

//第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")

rdd1.map(...)

val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")

rdd2.reduce(...)

//正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。

//这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,

//然后对这一个RDD执行了多次算子操作。

//但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,

//还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。

//要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,

//才能保证一个RDD被多次使用时只被计算一次。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")

rdd1.map(...)

原则二:尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。

比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的,那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。

对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

一个简单的例子

// 错误的做法。

// 有一个<Long, String>格式的RDD,即rdd1。

// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。

JavaPairRDD<Long, String> rdd1 = ...

JavaRDD<String> rdd2 = rdd1.map(...)

 // 分别对rdd1和rdd2执行了不同的算子操作。

rdd1.reduceByKey(...)

rdd2.map(...)

 // 正确的做法。

// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。

// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。

// 其实在这种情况下完全可以复用同一个RDD。

// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。

// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。

JavaPairRDD<Long, String> rdd1 = ...

rdd1.reduceByKey(...)

rdd1.map(tuple._2...)

 // 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。

// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。

// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。

原则三:对多次使用的RDD进行持久化

当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。

这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

对多次使用的RDD进行持久化的代码示例

// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。

// 正确的做法。

// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。

// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。

// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()

rdd1.map(...)

rdd1.reduce(...)

 // persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。

// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。

// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。

// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。

val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)

rdd1.map(...)

rdd1.reduce(...)

对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。

Spark的持久化级别

如何选择一种最合适的持久化策略:

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其它节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

  • 通常不建议使用DISK_ONLY和后缀为_2的级别。因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其它节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

原则四:尽量避免使用shuffle类算子

如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其它节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

所以在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast与map进行join代码示例

// 传统的join操作会导致shuffle操作。

// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。

val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不会导致shuffle操作。

// 使用Broadcast将一个数据量较小的RDD作为广播变量。

val rdd2Data = rdd2.collect()

val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。

// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。

// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。

val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。

// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

原则五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。

所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。

map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其它节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

比如以下两幅图就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

原则六:使用高性能的算子

除了shuffle相关的算子有优化原则之外,其它的算子也都有着相应的优化原则:

  • 使用reduceByKey/aggregateByKey替代groupByKey。

    详情见“原则五:使用map-side预聚合的shuffle操作”。

  • 使用mapPartitions替代普通map。

    mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

  • 使用foreachPartitions替代foreach。

    原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

  • 使用filter之后进行coalesce操作。

    通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。

    repartitionAndSortWithinPartitions是Spark官网推荐的一个算子。官方建议,如果是需要在repartition重分区之后还要进行排序,就可以直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

原则七:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。

广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

广播大变量的代码示例

// 以下代码在算子函数中,使用了外部的变量。

// 此时没有做任何特殊操作,每个task都会有一份list1的副本。

val list1 = ...

rdd1.map(list1...)

// 以下代码将list1封装成了Broadcast类型的广播变量。

// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。

// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。

// 每个Executor内存中,就只会驻留一份广播变量副本。

val list1 = ...

val list1Broadcast = sc.broadcast(list1)

rdd1.map(list1Broadcast...)

原则八:使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。

  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD、Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。

Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。

官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。

val conf = new SparkConf().setMaster(...).setAppName(...)

// 设置序列化器为KryoSerializer。

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// 注册要序列化的自定义类型。

conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原则九:优化数据结构

Java中,有三种类型比较耗费内存:

  • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。

  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

  • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

但在实践中,我们往往会发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。

同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。

因此笔者建议,可以在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

二、资源调优

在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者通常不知道该设置哪些必要的参数以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。

资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。

总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

1、Spark作业基本运行原理

详细原理如下图所示:

我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。

根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。

YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。

Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。

task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。

如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark是根据shuffle类算子来进行stage的划分。

如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。

因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

因此Executor的内存主要分为三块:

  • 第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;

  • 第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;

  • 第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是Spark作业的基本运行原理的说明,大家可以结合原理图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

2、资源参数调优


了解完Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数来优化资源使用的效率,从而提升Spark作业的执行性能。

以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值:

num-executors

参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常也有直接的关联。

参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

参数说明:该参数用于设置Driver进程的内存。

参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源。因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

3、资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \

  --master yarn-cluster \

  --num-executors 100 \

  --executor-memory 6G \

  --executor-cores 4 \

  --driver-memory 1G \

  --conf spark.default.parallelism=1000 \

  --conf spark.storage.memoryFraction=0.5 \

  --conf spark.shuffle.memoryFraction=0.3 \

根据实践经验来看,大部分Spark作业经过本次基础篇所讲解的开发调优与资源调优之后,一般都能以较高的性能运行了,足以满足我们的需求。但在不同的生产环境和项目背景下,还是有可能会遇到其他更加棘手的问题(比如各种数据倾斜),也可能会遇到更高的性能要求。

为了应对这些挑战,需要使用更高级的技巧来处理这类问题。在后续文章中,我们会详细讲解数据倾斜调优以及Shuffle调优。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

高性能Spark作业调优 的相关文章

  • 数据仓库主题三-(实施篇)

    背景 如何从具体的需求或项目转换为可实施的解决方案 如何进行需求分析 架构设计 详细模型设计等 则是模型实施过程中讨论的内容 业界常用两种数据仓库建设模型思想分为两种kimball和inmon模型 具体的kimball和inmon 模型思想
  • cdh下spark2-yarn运行sparkstreaming获取kafka数据使用spark-streaming-kafka-0-10_2.11报错解决

    报错问题 20 07 15 17 20 51 INFO utils AppInfoParser Kafka version 0 9 0 kafka 2 0 0 20 07 15 17 20 51 INFO utils AppInfoPars
  • Spark DataFrame的Join操作和withColumn、withColumnRenamed方法实践案例(Scala Demo代码)

    import org apache log4j Level Logger import org apache spark sql SparkSession import org apache spark sql functions obje
  • java中使用spark如何将column多列合为一列

    接下来介绍几种使用spark将DataFrame中一行的多列合并到一列中 并且该列以不同的类型展示保存 1 建立dataset 自己需要连接的mongo库 private static String datasource 自己需要连接的mo
  • Hive千亿级数据倾斜解决方案

    数据倾斜问题剖析 数据倾斜是分布式系统不可避免的问题 任何分布式系统都有几率发生数据倾斜 但有些小伙伴在平时工作中感知不是很明显 这里要注意本篇文章的标题 千亿级数据 为什么说千亿级 因为如果一个任务的数据量只有几百万 它即使发生了数据倾斜
  • 第四讲 系统建模方法

    系统建模一般流程 实际系统的分析方法大致类似 但对于实际系统的模型实现方式则有多种 1 面向方程 图框 的因果建模 基于图框进行系统模型实现 的软件很多 如Simulink AMESim 应用于控制领域 等 2 面向对象 非因果 的物理建模
  • 2022年第十四届电工杯赛题分析

    作为2022年上半年最后一场建模比赛的电工杯 为期72个小时 在众多建模比赛中 电工杯属于难度大 比赛时间短 知名度大的比赛 在我个人看来 电工杯可以作为美赛国赛之下第二梯队建模比赛里的头号比赛 因此 为了更好选题比赛 本次比赛我将在本篇文
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • 11.Linux下Spark的安装配置以及spark-shell的启动和 Spark集群环境搭建

    本案例软件包 链接 https pan baidu com s 1zABhjj2umontXe2CYBW DQ 提取码 1123 若链接失效在下面评论 我会及时更新 目录 1 安装Spark 1 先用xftp将安装包传到home hadoo
  • 2020-10-24 大数据面试问题

    上周面试数据开发职位主要从公司的视角讲一下记录下面试流水 1 三面技术一轮hr 面到了cto 整体来看是这一周技术含量最高信息量最大的一个 1到4轮过了4个小时 技术上的问题主要问的对数据分层的理解 1 一面自我介绍 目前团队的规模多大 2
  • Spark 任务调度机制

    1 Spark任务提交流程 Spark YARN Cluster模式下的任务提交流程 如下图所示 图YARN Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程 图Spark任务提交时序图 提交
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • Spark的常用概念总结

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 基本概念 1 RDD的生成 2 RDD的存储 3 Dependency 4 Transformation和Action 4 1 Transformatio
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为
  • spark hadoop环境及运行

    hadoop配置 在Ubuntu20 04里安装Hadoop详细步骤 图文 亲测成功 ubuntu20 04安装hadoop 菜鸡的学习之路的博客 CSDN博客 启动hadoop root ubuntu usr local hadoop s
  • 使用Minitab解决Excel的限制问题

    前两天 当我在做数据转置以期获得更好分析图形的时候 我碰到了传说中excel的限制 如图 在解决这个问题的过程中 我发现了用来做数据分析 比excel更好的工具Minitab 打开minitab 把纪录数据的Excel表以File gt O
  • spark SQL基础教程

    1 sparkSQL入门 sparksql专门用于处理结构化的数据 而RDD还可以处理非结构化的数据 sparksql的优点之一是sparkfsql使用统一的api读取不同的数据 第二个优点是可以在语言中使用其他语言 例如python 另外
  • Spark Sql之dropDuplicates去重

    文章目录 算子介绍 示例 问题 解决 dropDuplicates和distinct 参考 算子介绍 dropDuplicates去重原则 按数据行的顺序保留每行数据出现的第一条 dropDuplicates 在Spark源码里面提供了以下

随机推荐

  • keil5烧录或下载程序出现停止工作的问题

    本人在使用keil5烧录或下载程序出现停止工作的问题 开始认为是keil版本的原因 后来使用了keil4发现问题依然存在 发现因为本人使用了盗版JLINK被驱动检测出来了 由于安装的驱动版本为V6 14 新版的驱动检测到盗版JLINK 一旦
  • 1014 Waiting in Line (30)

    题目描述 Suppose a bank has N windows open for service There is a yellow line in front of the windows which devides the wait
  • 成为FISCO BCOS MVP,并肩链上创未来

    开源以来 FISCO BCOS受到众多开发者的支持 支撑生态内企业数百个应用项目的研发 其中 超120个应用投入使用 目前开源社区已汇聚了超40000名开发者 既有在区块链路上探索实践的开发者 也有自成一派 颇有建树的技术大牛 大家聚集于此
  • ESP32-C3 学习测试 蓝牙 篇(七、GATT 数据通信 — 发送自定义数据)

    前面我们已经入门了 GATT 的开发 更进一步 进行想要的数据通信 目录 前言 1 通信问题思考 2 如何才能每次传输不同的数据 3 对 handle 的认识 4 继续尝试 5 测试成功 结语 前言 本来计划直接做一个蓝牙的小应用 首先得实
  • 【Unity故障】Unityhub登录界面白屏(刷新不出那种感觉)

    平时打开UnityHub可能也就是偶尔需重新激活一下许可证 今天进去发现每个工程后都一个黄色感叹号标志 看了下账号也没登状态 当我以为登个账号就解决 然后就一直卡在这个界面 好像在哪看见过这个问题直接百度搜 资料很多 前人已经帮我们铺平了道
  • java.lang.IllegalArgumentException: jdbcUrl is required with driverClassName.

    springboot 配置多数据源时 启动出现java lang IllegalArgumentException jdbcUrl is required with driverClassName 修改 spring datasource
  • HIVE解析JSON数组

    HIVE解析JSON数组 数据示例 payAmount 375000 payChannelCode BOC payAmount 376000 payChannelCode AOC 1 get json object函数提取json数组里面特
  • 快速排序算法讲解及代码(详细)

    快速排序算法 一 序言 二 快速排序基本思想 三 具体步骤 四 具体代码 一 序言 快速排序是一种高效且使用广泛的排序算法 在很多语言的标准库中自带的排序都是快速排序 所以我们也有必要了解快排的原理以及实现方法 二 快速排序基本思想 算法思
  • C51单片机学习1

    工程的建立与烧录 一 工程的建立 1 在想要建立工程的地点新建一个文件夹 2 打开C51 选择新建项目 3 点击保存 4 选择芯片AT89C52 选好点击OK 5 破解版的一个bug 点击否就好了 6 然后工程就建立好了 二 工程的配置 1
  • FPGA引脚简介

    一 FPGA的硬件结构 这里说的硬件结构主要从FPGA的引脚功能方面描述 FPGA内部的电路逻辑暂不讨论 FPGA的引脚大致可以分为三类 功能引脚 IO引脚 电源和接地引脚 1 功能引脚 FPGA的功能引脚包含了FPGA配置程序加载 FPG
  • 如何自定义FileZilla编辑文件的默认打开方式

    需求来源于 用FileZilla编辑远程的文件时 每次打开居然不会提示我 打开方式 而是直接跳到了网页 以网页的形式展现 这样我怎么好进行编辑其中的内容 特别是对于xml文件 我们通常需要对它进行配置 可是 如果打开方式是网页 比如 直接跳
  • vue项目引入element-ui的三种方式

    1 element ui插件引用 推荐 通过执行命令vue add element来进行引入 2 npm install 此种方式为全量引入element ui 打包体积大 通过执行命令npm i element ui S安装依赖 main
  • linux超级用户添加,为linux普通用户添加超级用户权限sudo

    问题 假设用户名为 ali 如果用户名没有超级用户权限 当输入 sudo 命令 时 系统提示 ali is not in the sudoers file This incident will be reported 解决 1 进入超级用户
  • unity editer2020版本安装

    unity editer2020版本安装 前言 unity安装 激活许可证 前言 官方下载unity最新版本 unity安装 1 点击下载 2 点击 下载Unity Hub 3 由于我是windows10系统 所以点击 Windows 下载
  • vue表格展示照片点击放大并可左右切换查看

    一 vue展示后台返回照片集合 前端代码展示
  • 基于机器学习的恶意软件加密流量检测研究分享

    1 概述 2 恶意软件加密流量介绍 3 加密HTTPS流量解析 4 特征工程 5 模型效果 6 具体实施 7 总结 1 概述 近年来随着HTTPS的全面普及 为了确保通信安全和隐私 越来越多的网络流量开始采用HTTPS加密 截止今日 超过6
  • 深度学习模型推理时间与FPS的求取方法,以及time,OpenCV的API教程

    类似深度学习中目标检测的深度学习模型中有两个非常重要的性能指标 一个是MAP就是检测的准确率 另一个就是FPS 就是模型的推理速度 那么我们如何能够知道模型和视频的推理速度呢 接下来我们直接进入正题 一 求取模型的推理时间我们需要借助pyt
  • C语言支不支持重载?

    首先这个问题的答案是C 支持函数重载而C语言不支持函数重载 下面我们从程序编译链接阶段看看其中的原因 先看看重载的定义 函数重载就是指 在同一作用域类 一组函数的函数名相同 参数列表不同 个数不同或类型不同 返回值可同可不同 那么问题来了
  • 风火速打印小程序分析

    这里写自定义目录标题 软件需求背景 分析风火速功能 功能介绍 总结 软件需求背景 类似淘宝 京东第三方卖家需要一个OMS系统 并提供辅助的快递订单打印功能 现在菜鸟物流已标准化了各家快递公司的电子面单 也可以独立对接快递公司 商家需求提供一
  • 高性能Spark作业调优

    在大数据计算领域 Spark已经成为了越来越流行 越来越受欢迎的计算平台之一 Spark的功能涵盖了大数据领域的离线批处理 SQL类处理 流式 实时计算 机器学习 图计算等各种不同类型的计算操作 应用范围与前景非常广泛 在美团点评 已经有很