MapReduce shuffle过程详解

2023-11-01

一、MapReduce计算模型

我们知道MapReduce计算模型主要由三个阶段构成:Map、shuffle、Reduce。

Map是映射,负责数据的过滤分法,将原始数据转化为键值对;Reduce是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果。为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle。整个MR的大致过程如下:

这里写图片描述

Map和Reduce操作需要我们自己定义相应Map类和Reduce类,以完成我们所需要的化简、合并操作,而shuffle则是系统自动帮我们实现的,了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。

Shuffle过程包含在Map和Reduce两端,即Map shuffleReduce shuffle

二、Map shuffle

在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认),其流程大致如下:

这里写图片描述

Partition

对于map输出的每一个键值对,系统都会给定一个partition,partition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

我们知道每一个Reduce的输出都是有序的,但是将所有Reduce的输出合并到一起却并非是全局有序的,如果要做到全局有序,我们该怎么做呢?最简单的方式,只设置一个Reduce task,但是这样完全发挥不出集群的优势,而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner,用输入数据的最大值除以系统Reduce task数量的商作为分割边界,也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行partition后的数据是整体有序的。

另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

Collector

Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长,如图所示:

这里写图片描述

Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

Kvbuffer的大小可以通过io.sort.mb设置,默认大小为100M。但不管怎么设置,Kvbuffer的容量都是有限的,键值对和索引不断地增加,加着加着,Kvbuffer总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地spill到具有更大空间的磁盘。

关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。Spill的门限可以通过io.sort.spill.percent,默认是0.8。

Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

Sort

当Spill触发后,SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

Spill

Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill(),对结果进行进一步合并后再写出。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和spill12.out文件也不一定在同一个目录下。每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示:

这里写图片描述

在Spill线程如火如荼的进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一无既往地进行着数据输出。Map还是把数据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:

这里写图片描述

Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

Merge

Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。

Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuffer这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个io步骤还是值得考虑的。)

这里写图片描述

然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引,一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。

然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。最终的索引数据仍然输出到Index文件中。

三、Reduce shuffle

在Reduce端,shuffle主要分为复制Map输出、排序合并两个阶段。

Copy

Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后,会通知父TaskTracker状态已经更新,TaskTracker进而通知JobTracker(这些通知在心跳机制中进行)。所以,对于指定作业来说,JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会从此输出对应的TaskTracker上复制输出到本地,而不会等到所有的Map任务结束。

Merge Sort

Copy过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。

当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓冲区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。

以上就是我对shuffle过程的理解,如有不对之处还请指正
参考:
http://www.csdn.net/article/2014-05-19/2819831-TDW-Shuffle/1
http://blog.csdn.net/xiaolang85/article/details/8528892
http://shiyanjun.cn/archives/588.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce shuffle过程详解 的相关文章

  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • Hadoop 安全模式恢复 - 花费太长时间!

    我有一个包含 18 个数据节点的 Hadoop 集群 我在两个多小时前重新启动了名称节点 并且名称节点仍处于安全模式 我一直在寻找为什么这可能花费太长时间 但找不到好的答案 发帖在这里 Hadoop 安全模式恢复 花费大量时间 https
  • 无法从 JAR 文件加载主类

    我有一个 Spark scala 应用程序 我尝试显示一条简单的消息 Hello my App 当我编译它时sbt compile并运行它sbt run没关系 我成功显示了我的消息 但他显示了错误 像这样 Hello my applicat
  • 在 Hadoop 中处理带标头的文件

    我想在 Hadoop 中处理很多文件 每个文件都有一些头信息 后面跟着很多记录 每个记录都存储在固定数量的字节中 对此有何建议 我认为最好的解决方案是编写一个自定义的InputFormat http hadoop apache org co
  • Spark 上的 Hive 2.1.1 - 我应该使用哪个版本的 Spark

    我在跑蜂巢2 1 1 Ubuntu 16 04 上的 hadoop 2 7 3 根据Hive on Spark 入门 https cwiki apache org confluence display Hive Hive on Spark
  • hive查询无法通过jdbc生成结果集

    我是 Hive 和 Hadoop 的新手 在我的教程中 我想将表创建为 import java sql SQLException import java sql Connection import java sql ResultSet im
  • 猪如何过滤不同的对(对)

    我是猪的新手 我有一个 Pig 脚本 它在两个元素之间生成制表符分隔的对 每行一对 例如 John Paul Tom Nik Mark Bill Tom Nik Paul John 我需要过滤掉重复的组合 如果我使用 DISTINCT 我会
  • R+Hadoop:如何从HDFS读取CSV文件并执行mapreduce?

    在以下示例中 small ints to dfs 1 1000 mapreduce input small ints map function k v cbind v v 2 MapReduce函数的数据输入是一个名为small ints的
  • 获取 emr-ddb-hadoop.jar 将 DynamoDB 与 EMR Spark 连接

    我有一个 DynamoDB 表 需要将其连接到 EMR Spark SQL 才能对该表运行查询 我获得了带有发行标签 emr 4 6 0 和 Spark 1 6 1 的 EMR Spark Cluster 我指的是文档 使用 Spark 分
  • 如何在 Hadoop 中将 String 对象转换为 IntWritable 对象

    我想转换String反对IntWritableHadoop 中的对象 任何过程都可以进行转换 IntWritable value new IntWritable Integer parseInt someString 并处理以下可能性par
  • hive 从两个数组创建映射或键/值对

    我有两个具有相同数量值的数组 它们映射为 1 1 我需要从这两个数组创建一个键 值对或映射 键 值 任何想法或提示都会有帮助 当前表结构 USA WEST NUMBER Street City 135 Pacific Irvine USA
  • Hive - 线程安全的自动递增序列号生成

    我遇到一种情况 需要将记录插入到特定的 Hive 表中 其中一列需要是自动递增的序列号 即在任何时间点都必须严格遵循 max value 1 规则 记录从许多并行的 Hive 作业插入到这个特定的表中 这些作业每天 每周 每月批量运行 现在
  • 如何从hdfs读取文件[重复]

    这个问题在这里已经有答案了 我在 project1目录下的hadoop文件系统中有一个文本文件名mr txt 我需要编写 python 代码来读取文本文件的第一行 而不将 mr txt 文件下载到本地 但我无法从 hdfs 打开 mr tx
  • 处理 oozie 工作流程中的循环

    我有一个 oozie 用例 用于检查输入数据可用性并根据数据可用性触发 MapReduce 作业 所以我编写了一个 shell 脚本来检查输入数据 并在 oozie 中为其创建了一个 ssh 操作 输入数据检查的重试次数和重试间隔应该是可配
  • Flume将数据从MySQL迁移到Hadoop

    请分享您的想法 需求是将MySQL db中的数据迁移到Hadoop HBase进行分析 数据应该实时或接近实时地迁移 Flume可以支持这个吗 有什么更好的方法 据我了解 Flume 并不是为此而设计的 Flume 基本上用于读取日志 如数
  • 在 Windows 7 64 位中删除 Spark 临时目录时出现异常

    我正在尝试在 Windows 7 64 位中运行 Spark 作业的单元测试 我有 HADOOP HOME D winutils winutils path D winutils bin winutils exe 我运行了以下命令 winu
  • Pig 10.0 - 将元组分组并在 foreach 中合并包

    我在用着Pig 10 0 我想在 foreach 中合并包 假设我有以下内容visitors alias a b 1 2 3 4 a d 1 3 6 a e 7 z b 1 2 3 我想对第一个字段上的元组进行分组 并将包与一组语义合并以获
  • 如何跟踪hadoop中哪个数据块在哪个数据节点?

    如果复制一个数据块 会复制到哪个数据节点 是否有任何工具可以显示复制块存在的位置 如果您知道文件名 则可以通过 DFS 浏览器查找 转到您的 namenode Web 界面 说 浏览文件系统 并导航到您感兴趣的文件 在页面底部 将列出文件中
  • 2n + 1 法定人数是什么意思?

    我在描述 HBase 的 Zookeeper 配置时遇到过这个问题 但我对这个术语并不熟悉 N 与我的 HBase 集群中的节点数量有关系吗 或者我应该在 Zookeeper 集群中使用的节点数量 2f 1是指你所需要的可靠性 可用性水平
  • Talend 和 Apache Spark?

    我对 Talend 和 Apache Spark 在大数据生态系统中的定位感到困惑 因为 Apache Spark 和 Talend 都可以用于 ETL 有人可以用一个例子解释一下吗 Talend 是一种基于工具的大数据方法 通过内置组件支

随机推荐

  • 测试用例具体设计方法

    目录 一 根据需求写测试用例 二 测试用例具体的设计方法 1 等价类 2 边界值 3 因果图法 4 正交法 5 场景法 6 错误猜测法 一 根据需求写测试用例 1 首先要保证需求的合理性和正确性 要先验证需求 2 分析需求 把大需求细化成小
  • 机器如何识别花的种类

    惊蛰已过 一声春雷 大地开始解冻 条件允许的话 出去晒晒太阳 看看风景 赏赏花 也可以在央视直播云赏花 十多个机位展示了全国各地不同地区的美丽风景 形色 看到好看的花 但是叫不上名字怎么办 这里推荐一下 形色 一款小众的识花应用 带你遇见全
  • 使用arcpy导出要素类和删除要素类中的要素数据

    需求 目的是在指定的GDB数据库中 将要素数据集SourcePolygon下的北京图层中的Layer字段值等于 KZ 控制指标 的记录先导出到新创建的TempPolygon下 导出的图层名为 ControlIndex 然后再将北京图层中的L
  • java知识点——case

    A continue statement can be used only in a loop continue语句只能在循环中使用 A break statement can t be used only in a loop break语
  • 【计算机视觉

    文章目录 一 检测相关 9篇 1 1 Federated Ensemble YOLOv5 A Better Generalized Object Detection Algorithm 1 2 Zero shot Nuclei Detect
  • 使用WinDbg Preview 查看Windows 10蓝屏Dump文件

    从应用商店安装WinDbg Preview 1 登陆应用商店 搜索WinDbg Preview 2 选择获取 我的Windows 10 系统已经安装过了 3 在启动菜单可以找到WinDbg Preview 4 找到蓝屏文件 并选择使用Win
  • 【Github】错误解决:OpenSSL SSL_read: Connection was reset, errno 10054

    git 报错信息 OpenSSL SSL read Connection was reset errno 10054 Git 中 push 或者 pull 报错 OpenSSL SSL read Connection was reset e
  • arthas的trace、watch、tt、profiler命令的使用

    arthas的trace watch tt profiler命令的使用
  • 通过C语言实现小数整数求原码,反码,补码

    通过C语言实现小数 整数求原码 反码 补码 判断输入的值是整数还是小数 是正是负 求纯整数不含符号的原码 求纯小数不含符号的原码 完善整个原码 符号 小数 整数合在一起 将求原码的函数封装在一个函数里 求反码的函数 求补码的函数 main函
  • Linux-线程学习(上)

    本文导航 内容 所占百分比 线程概念 40 线程与进程区别与联系 20 线程优缺点 10 线程控制 创建 终止 等待 30 线程的概念 谈到线程 我们先从进程说起 我们写的程序从硬盘加载到内存开始运行时 进程就产生了 也就是操作系统开始为这
  • 水仙花数(Java实现)

    春天是鲜花的季节 水仙花就是其中最迷人的代表 数学上有个水仙花数 他是这样定义的 水仙花数 是指一个三位数 它的各位数字的立方和等于其本身 比如 153 1 3 5 3 3 3 现在要求输出所有在m和n范围内的水仙花数 import jav
  • 栈溢出原理

    栈溢出原理 文章目录 栈溢出原理 前言 栈 一 栈溢出原理 二 栈保护技术 三 常发生栈溢出的危险函数 四 可利用的栈溢出覆盖位置 总结 前言 栈 栈是一种LIFO的数据结构 应用程序有一到多个用户态栈 栈自底向上增长 由指令PUSH和PO
  • tcpdump抓包注意事项

    使用tcpdump进行抓包 然后用wireshark进行分析的时候 出现了 Packet size limited during capture 也不算是错误 只是数据包里的内容无法完全查看清楚 经过查询 原因是因为我在Linux下进行抓包
  • es6合并对象

    es5 let name name sea age age 15 person Object assign person name age console log person name sea age 15 es6 let name na
  • golang 读取项目配置文件

    golang读取文件配置 介绍golang项目中配置文件的读取相关内容 包括项目结构 具体实现代码等内容 ref 煎鱼 实际上这只是煎鱼博客项目中的一小部分 项目结构 配置读取相关文件结构如下 config文件夹下存放config yaml
  • 大数据从入门到精通(超详细版)之 Hive的配置与基本语法

    前言 嗨 各位小伙伴 恭喜大家学习到这里 不知道关于大数据前面的知识遗忘程度怎么样了 又或者是对大数据后面的知识是否感兴趣 本文是 大数据从入门到精通 超详细版 的一部分 小伙伴们如果对此感谢兴趣的话 推荐大家按照大数据学习路径开始学习哦
  • xman的思维导图快捷键_思维导图软件——MindMaster常用快捷键汇总

    原标题 思维导图软件 MindMaster常用快捷键汇总 思维导图 英文是The Mind Map 又叫心智导图 是表达发散性思维的有效图形思维工具 MindMaster Mac版是最新推出的一款免费跨平台 多功能的思维导图软件 可以帮助您
  • 发明计算机的人的名人名言,60句关于发明的名言

    1 没有艰苦的学习 就没有最简单的科学发明 南斯拉夫谚语 2 需要是发明之母 拉丁谚语 3 天才是不足恃的 聪明是不可靠的 要想顺手拣来的伟大科学发明是不可想象的 华罗庚 4 一项发明创造会带来更多的发明创造 爱默生 5 科学的真正的与合理
  • Selenium下Chrome配置 (含启动无痕界面--无界面浏览器)

    转载 https www cnblogs com kaibindirver p 11432850 html Selenium下Chrome配置 含启动无痕界面 无界面浏览器 例子 设置无界面模式浏览器启动 chrome options we
  • MapReduce shuffle过程详解

    一 MapReduce计算模型 我们知道MapReduce计算模型主要由三个阶段构成 Map shuffle Reduce Map是映射 负责数据的过滤分法 将原始数据转化为键值对 Reduce是合并 将具有相同key值的value进行处理