将map输出作为输入传递给reducer的过程称为shuffle。
Shuffle过程包含在Map和Reduce两端
map阶段大致过程为:
写数据,分区,排序,将属于同一分区的输出合并一起写在磁盘上。
每个map任务都有一个环形内存缓冲区用于存储任务输出。环形内存缓冲区默认大小为100M。
map开始产生输出数据时,先将数据写入缓冲区中,当缓冲区中数据达到阈值(默认为0.8)时,就开始把数据溢出到本地磁盘,溢出的文件成为spill文件。溢出的过程中,map输出的数据会继续写入缓冲区,如果此时缓冲区已经满了,那么map会等待缓冲区有空间时再进行写入。
在将缓冲区的数据写入到磁盘之前,首先会根据数据最终要传的reducer对缓冲区的数据进行分区,在每个分区中,数据按key进行排序(快速排序)。如果有combiner函数,则会在排序后的输出上运行,使得map的输出结果更加紧凑。
把所有溢出的文件进行一次合并,合并成一个已分区且已排序的输出文件。
reduce阶段大致过程为:
复制,合并。
reducer通过HTTP得到输出文件的分区。
reduce有少量的复制线程(默认是5个),可以并行的从map上复制数据。
Reduce可能需要从多个map任务中获取数据,因此只要多个map中的一个完成,reduce便可以从map复制数据。如果map的输出数据较小,会直接复制到内存;否则,map输出被复制到磁盘。当内存缓冲区达到阈值或达到map输出阈值时,合并后溢出写出到磁盘。如果指定combiner,那么会在合并期间运行它以降低写入磁盘的数据量。
最后会排序合并这些溢出文件。
简单总结:
先把数据写入到缓冲区,每次写入之前要计算分区号,当要溢写时,会对数据进行排序,根据key进行排序(可以自己定义按什么来排序),溢写时如果设置了combiner(适合加减场景),会先对溢写的数据进行合并,每次溢写会生成spill文件,将多个spill文件进行合并,合并成总的文件,合并时如果片段数量大于等于3,这个时候combiner会继续运行,如果不大于则不会运行。在maptask输出时,还可以指定将合并后的数据以压缩格式输出。最终将文件写入磁盘。
Reduce阶段,通过shuffle线程拷贝指定分区的数据,拷贝后的数据会先在shuffle线程内存中进行合并,如果内存不够,也会进行多次溢写,最终对所有的数据进行一次归并排序。
1)环形缓冲区:
排序方式:快排+字典序
默认溢写阈值:80%
默认大小:100M
合理的调节环形缓冲区大小以及溢写阈值是一种常见的 MR 优化手段
2)切片机制:
a) 简单地按照文件的内容长度进行切片
b) 切片大小,默认等于 Block 大小
c) 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
切片大小公式:max(0,min(Long_max,blockSize))
一、分区
1. 分区是在MapTask中通过Partitioner来计算分区号
2. Partitioner的初始化
①计算总的分区数partitions,取决于用户设置的reduceTask的数量
②partitions>1,默认尝试获取用户设置Partitioner,如果用户没有定义,那么会使用HashPartitioner
HashPartitioner根据key的hashcode进行计算,相同的key以及hash值相同的key会分到一个区
②partitions<=1,默认初始化一个Partitioner,这个Partitioner计算的所有的区号都为0
3. 注意
通常在Job的设置中,希望将数据分为几个区,就设置reduceTask的数量为对应的数量!
partitions=设置的reduceTask的数量,0<=分区器计算的区号 < partitions
二、排序
1. 排序是MR框架在shuffle阶段自动进行
2. 在MapTask端发生两次排序,在排序时,用户唯一可以控制的是提供一个key的比较器
3. 设置key的比较器
①用户可以自定义key的比较器,自定义的比较器必须是一个RawComparator类型的类
重点是实现compareTo()方法
②用户通过key,让key实现WritableComparable接口,系统自动提供一个比较器
重点是实现compareTo()方法
4. 排序的分类
全排序: 对所有的数据进行排序,指生成一个结果文件,这个结果文件整体有序
部分排序: 最终生成N个结果文件,每个文件内部整体有序
二次排序: 在对key进行比较时,比较的条件为多个
辅助排序: 在进入reduce阶段时,通过比较key是否相同,将相同的key分为1组
三、分组
1. 分组通过分组比较器,对进入reduce的key进行对比,key相同的分为一组,一次性进入Reducer,被调用reduce方法
2. 分组比较器的设置
①用户可以自定义key的分组比较器,自定义的比较器必须是一个RawComparator类型的类
重点是实现compareTo()方法
②如果没有设置key的分组比较器,默认采取在Map阶段排序时,key的比较器
3. Reduce的细节
在进入reduce(),Reducer会自动实例化一个key,value,这个key-value在Redcuer工作期间,一直是一个不变的对象
每次迭代,reducer会把读到的新的key-value的属性值赋值给key-value!
四、Combiner
1. Combiner的本质是一个Reducer,对key-value进行合并,Combiner的作用就是对map端的输出先做一次合并,以减少map和reduce结点之间的数据传输量,以提高网络IO性能。
2. Combiner 和 Reducer的区别
Combiner在shuffle阶段运行
Reducer在reduce阶段运行
3. Combiner适用于 +,-操作,不适合 *,/操作
4. Combiner的运行时机
在MapTask端: ①每次从缓冲区将数据溢写到磁盘之前,如果设置了Combiner,数据会被Combine之后再溢写到磁盘
②在MapTask最后的merge阶段,如果溢写的片段数据>=3,,如果设置了Combiner,在生成最终的数据时,也会先执行Combine之后再溢写到磁盘
在ReduceTask端: ③shuffle线程从多个MapTask读取同一个分区的数据,之后进行合并,在合并时,如果shuffle所使用的内存不够,也会将部分数据临时溢写到磁盘,此时如果设置了Combiner,数据会被Combine之后再溢写到磁盘
5. Combiner的本质目的是为了减少MR在运行期间的磁盘IO和网络IO