Hadoop源码分析——计算模型MapReduce

2023-05-16

MapReduce 是一个计算模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于 key/value pair 的数据集合;然后在创建一个Reduce函数用来合并所有的具有相同中间 Key 值得中间Value值。

1. Map 处理过程

1.1 Mapper 概述

Mapper函数最核心的作用就是对输入的key/value进行处理,然后输出一系列的 key/value 集合。处理流程可以表示为:

(key,value)——> Mapper——>list<key,value>

在Mapper阶段处理之前,Hadoop的MapReduce框架会为由作业的InputFormat生成的每一个输入切片InputSplit创建一个对应的Mapper处理函数。在Mapper内部,我们可以通过调用JobContext的getConfiguration方法来获得与该作业相关的所有配置信息。

Hadoop会将Mapper的输出key/value按照key进行分组处理,使具有相同输出key的key/value键值对放在一起,然后将它们分给相同的Reducer来处理。用户可以通过指定特定的RawComparator实现类来控制分组过程的执行。此外用户也可以通过指定Partitioner实现类来控制Mapper的输出被分给哪个具体的Reducer进行处理。由于Reducer和Mapper一般会运行在不同的主机上,所以Reducer必须通过网络来获得Mapper的输出结果来作为输入。

为了减少网络上的数据传输量,我们可以为作业指定相应的Combiner。该类一般会采用已经实现好的Reducer来代替。Combiner会在Mapper端对Mapper的输出结果进行本地的聚集处理,从而减少发送给Reducer的数据量。另一种方法是采用相应的压缩机制对Mapper的输出进行压缩处理。另外并不是所有的Hadoop左右都哟Reducer处理函数,当没有Reducer时,mapper的处理结果会直接通过指定的OutputFormat写入到输出目录中。

1.2 Mapper 源代码分析

Mapper类所在的包为 org.apache.hadoop.mapreduce。

1.2.1 成员方法

在Mapper类中,除了run方法是public的之外,其他的map、cleanup和setup方法都是protected类型的访问权限,保证只有自己的子类以及该类相同报下的类可见。我们一般只会重写map方法来完成对输入key/value的处理。

//只会在mapper任务开始的时候调用一次,完成如MultipleOutputs必须在该方法中进行相应的实例化。
protected void setup(Context context ) throws IOException, InterruptedException {
    // NOTHING
  }
//只会在任务结束的时候被调用一次,完成一些作业完毕之后的清理工作
protected void cleanup(Context context ) throws IOException, InterruptedException {
    // NOTHING
  }

//map方法是该类的核心方法,针对每一个输入key/value键值对执行一次。
protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

//该方法在hadoop运行mapper任务的时候被调用
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

1.2.2 内部类

在Mapper中包含一个 Context 内部类,该类只实现了MapContext类,并没有引入任何新方法:

public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}

1.2.3 Mapper 的实现子类

Hadoop 为我们提供了若干个针对不同处理情况的Mapper实现子类。

1.2.3.1 InverseMapper 反转Mapper

该类的实现比较简单,他只是重写了Mapper中的map方法来实现输入的key/value键值对中的key和value进行互换,在进行输出:

  @Override
  public void map(K key, V value, Context context
                  ) throws IOException, InterruptedException {
    context.write(value, key);
  }
1.2.3.2 TokenCounterMapper 标记计数 Mapper

该类在map方法中根据默认的标记来对输入的value进行分解,并进行计数处理:

//对字符串分店进行计数  
private final static IntWritable one = new IntWritable(1);
//将取到的字符串分段转换为Text类型
private Text word = new Text();  
@Override
public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
}
1.2.3.3 multithreadMapper 多线程 Mapper

MultithreadMapper 的主要工作原理就是启动多个线程来执行另一个Mapper中的map方法,这种方式可以有效的提供系统处理作业的能力。其中启动的线程个数由mapred.map.multithreadrunner.threads 配置项决定,默认为10个线程。要执行的Mapper通过mapred.map.multithreadrunner.class配置项决定。MultithreadMapper分别提供了用于设置上面两个配置项的set方法。

1)、成员变量

//MultithreadMapper要执行的Mapper类  
private Class<? extends Mapper<K1,V1,K2,V2>> mapClass;
//用于将MultithreadMapper的处理结果进行输出的MapContext
private Context outer;
//运行map方法的MapRunner列表,其中MapRunner为MultithreadMapper中的一个内部类
private List<MapRunner> runners;

2)、成员方法

  /**
   * 获取MultithreadMapper启动的线程数
   */
  public static int getNumberOfThreads(JobContext job) {
    return job.getConfiguration().getInt(NUM_THREADS, 10);
  }
  /**
   * 设置MultithreadMapper启动的线程数
   */
  public static void setNumberOfThreads(Job job, int threads) {
    job.getConfiguration().setInt(NUM_THREADS, threads);
  }
  /**
   * 获取MultithreadMapper所处理的Mapper类
   */
  public static <K1,V1,K2,V2>
  Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
    return (Class<Mapper<K1,V1,K2,V2>>) 
      job.getConfiguration().getClass(MAP_CLASS, Mapper.class);
  }
  /**
   * 设置MultithreadMapper所处理的Mapper类
   */
     public static <K1,V1,K2,V2> 
  void setMapperClass(Job job, 
                      Class<? extends Mapper<K1,V1,K2,V2>> cls) {
    if (MultithreadedMapper.class.isAssignableFrom(cls)) {
      throw new IllegalArgumentException("Can't have recursive " + 
                                         "MultithreadedMapper instances.");
    }
    job.getConfiguration().setClass(MAP_CLASS, cls, Mapper.class);
  /**
   * run方法按指定的线程个数运行指定的Mapper类中的run方法
   */
           public void run(Context context) throws IOException, InterruptedException {
    outer = context;
    //首先取得需要创建的线程数
    int numberOfThreads = getNumberOfThreads(context);
    //然后取得被执行的Mapper类
    mapClass = getMapperClass(context);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
                " threads");
    }
    //初始化numberofThread是数量的线程,然后放入到线程池中
    runners =  new ArrayList<MapRunner>(numberOfThreads);
    for(int i=0; i < numberOfThreads; ++i) {
      MapRunner thread = new MapRunner(context);
      thread.start();
      runners.add(i, thread);
    }
    //对所有的线程执行join操作
    for(int i=0; i < numberOfThreads; ++i) {
      MapRunner thread = runners.get(i);
      thread.join();
        ......
      }
    }
  }

3)、内部类

MultithreadMapper 中包含有四个内部类,由于被处理的Mapper共享一份InputSplit,所以InputSpilt数据的读取必须是线程安全的,这些子类通过互斥的访问MultithreadMapper中的Context来实现线程安全机制。

/**
 *SubMapRecordReader类主要作用就是从指定的Context中复制键值对并保证这个操作是同步互斥进行的
 */
private class SubMapRecordReader extends RecordReader<K1,V1> {
    ......
}
/**
 *SubMapRecordWriter类主要作用就是将Mapper出来的结果写入到指定的输出,这个操作也是同步互斥的
 */
private class SubMapRecordWriter extends RecordWriter<K2,V2> {
    ......
}
/**
 *SubMapStatusReporter类的主要作用就是汇报MultithreadMapper的处理进度信息
 */
private class SubMapStatusReporter extends StatusReporter {
    ......    
}
/**
 *MapRunner类是真正的线程执行类,它会调用指定Mapper中的run方法
 */
private class MapRunner extends Thread {
    ......
        //首先通过反射机制创建被处理的Mapper的实例,然后初始化与该Mapper相对应得执行上下文
        MapRunner(Context context) throws IOException, InterruptedException {
        mapper = ReflectionUtils.newInstance(mapClass, 
                                             context.getConfiguration());
        MapContext<K1, V1, K2, V2> mapContext = 
            new MapContextImpl<K1, V1, K2, V2>(outer.getConfiguration(), 
                                               outer.getTaskAttemptID(),
                                               reader,
                                               new SubMapRecordWriter(), 
                                               context.getOutputCommitter(),
                                               new SubMapStatusReporter(),
                                               outer.getInputSplit());
        subcontext = new WrappedMapper<K1, V1, K2, V2>().getMapContext(mapContext);
        reader.initialize(context.getInputSplit(), context);
    }
    @Override
    public void run() {
        try {
            mapper.run(subcontext);
            reader.close();
        } catch (Throwable ie) {
            throwable = ie;
        }
    }
}
1.2.3.4 FieldSelectionMapper 字段选择Mapper

该类所在的包为org.apache.hadoop.mapreduce.lib.fieldsel,可以用于高效灵活的处理文本数据。该类将输入数据看做由用户指定的分隔符分割不同字段组成,默认的分隔符为Tab。可以选择输入字段列表中的若干个字段作为输出的key和value。

1.2.3.5 DelegatingMapper 授权 Mapper

DelegatingMapper的实现机制和DelegatingRecordReader的实现机制类似,他们都作为包装类来存在,正正功能的实现是靠内部封装的类来完成的。

2. Reducer 处理过程

2.1 Reducer 概述

Reducer的主要作用就是讲Mapper的输出结果中具有相同key的键值对进行进一步的reduce(规约)处理,从而产生更少的输出键值对。与Mapper不同的是,Reducer的数量是可以通过Job的setNumReduceTasks方法进行设置的。Reducer包括如下三个主要阶段:

  • Shuffle阶段:由于Reducer的输入是来自Mappe已经排好序的输出,而Reducer和Mapper一般在不同主机上,所以Reducer所在的第一步操作就是利用HTTP网络协议将所有Mapper的输出中与该Reducer相关的数据复制到Reducer的主机上。
  • Sort阶段:在该阶段,MapReduce框架会将来自不同Mapper的具有相同key的输出key/value键值对按照key进行排序。
  • Reduce阶段:该阶段,MapReduce框架会为已经分好组的每一个<key,(list of value)>调用一次reduce方法。Reducer的输出键值对通过RecordWriter写入到真正的文件系统。上面的Shuffle和sort两个阶段是同时进行的,Mapper的输出也是一边被取回一边被合并的。MapReduce还为我们提供了一个SecondarySort(二次排序)阶段。为了实现SecondarySort,我们需要定义一个group comparator(分组比较器)。sort阶段使用sort comparator针对key来进行操作,该阶段会将具有相同key的value放置在一个列表里:而group阶段是依赖group comparator进行的,该阶段会针对某个key所对应的value列表进行分组操作,使整个value列表分割成不同的组。

2.2 Reducer 源代码

与Mapper类中的一样,Reducer类中除了run方法是public,其它方法都是protected修饰的。

2.2.1 成员方法

//只会在Reducer任务开始的时候调用一次
protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }
  //只会在任务结束的时候被调用一次
protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  //reduce方法是该类的核心方法,在map方法中输入的是<key,alue>,而reduce输入的是<key,list(value)>
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

2.2.2 内部类

在Reducer中也包含一个Context内部类,该类只是继承了ReduceContex类,并没有引入任何新方法。

2.2.3 IntSunReducer 和 LongSumReducer

这两个类都在lib.reduce包中,它们实现的目的都是一样的,即计算相同key所对应的所有value之和,如下列出IntSumReducer的源代码如下:

//定义用于保存计算结果之和IntWritable变量  
private IntWritable result = new IntWritable(); 
public void reduce(Key key, Iterable<IntWritable> values, 
                     Context context) throws IOException, InterruptedException {
    int sum = 0;
    //计算所有的value之和
    for (IntWritable val : values) {
      sum += val.get();
    }
    //将int类型的结果化为IntWritable类型
    result.set(sum);
    context.write(key, result);
  }

2.2.4 FieldSelectionReducer 字段选择 Reducer

该类将输入数据分割成若干个字段,然后针对这些字段进行相应的处理。

至此,Mapper和Reducer就都分析完毕了。在Mapper和Reducer之间还有一些特殊的处理步骤,比如Combiner以及Partitioner。

3. Partitioner 分区处理过程

3.1 Partitioner 概述

Partitioner 分区处理过程在Mapper之后,Reducer之前进行执行。它的主要作用就是把Mapper输出的中间结果按照key分给不同的Reducer任务进行处理。要保证Hadoop的负载均衡,Partitioner需要满足以下的两个条件:

  • 平均分布。即每个Reducer处理的reduce数量应该尽可能相等。
  • 高效。由于Mapper输出的每个key/value键值对在分发给Reduce处理之前都需要Partitioner的相应处理,所以它的效率至关重要,需要使用高效的算法实现。

3.1.2 Partitioner 源代码

//根据给的的键值对和分区的总数量(一般为Reducer任务的数量),返回该键值对所对应的分区号
public abstract class Partitioner<KEY, VALUE> {
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}

3.1.3 HashPartitioner hash 分区

Had和Partitioner是Partitioner的默认实现类,该类会使用hash函数来对Mapper的输出进行分区处理。

//根据key的hash码和Reducer任务的数量来产生对应得分区号。  
public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

该方法会将Mapper输出的键值对均匀的分发给不同的Reducer。比如Key为Text的话,Text的hashcode方法跟String的基本机制,都采用Horner公式计算,得到一个int。string太大的话,这个int值可能会溢出成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布子啊reduce上。

3.1.4 BinaryPartitioner 二进制分区

BinaryPartitioner 处理的键值对值中的key必须是BinaryComparable字节可以比较类型的,例如Text就是一个BinaryComparable的实例。因为Text可以按字节进行比较,所以当两个Text的实例对于的字节不相等时就可以立即判断出他们的大小关系。BinaryPartitioner会利用BinaryComparable类型的key的getBytes()方法返回对应得字节数组中的一部分求出该键值对所对应得分区号。

3.1.5 KeyFieldBasedPartitioner 基于键字段的分区

KeyFieldBasedPartitioner的处理逻辑就是首先将key分割成由不同的字段的字段列表,然后取得列表中的若干个字段进行分区处理。其中key中不同字段之间的分隔符通过mapreduce.fields.for.partition配置项来确定取出key的前几个字段用作分区处理。

3.1.6 TotalOrderPartitioner 全排序分区

虽然每个Mapper的输出是排好序的,但是不同的Mapper的输出之间是没有顺序的。为了实现最终的Reducer的输出是排好序的,此时可以使用TotalOrderPartitioner。由于TotalOrderPartitioner本身无法确定数据的分布情况,所以它所作的第一件事情就是利用InputSampler数据采样器来确定数据的分布情况。获得数据的分布情况之后,接下来就是根据数据的分布情况来采取不同的分区策略。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop源码分析——计算模型MapReduce 的相关文章

随机推荐

  • 每天坚持刷 LeetCode 的人,究竟会变得有多强... 学习技巧都藏在这几个公众号里面了......

    信息爆炸时代 xff0c 与其每天被各种看过就忘的内容占据时间 xff0c 不如看点真正对你有价值的信息 xff0c 下面小编为你推荐几个高价值的公众号 xff0c 它们提供的信息能真正提高你生活的质量 人工智能爱好者社区 专注人工智能 机
  • 超酷炫!智能无人机中文教程重磅上线!

    前 言 对于大多数无人机爱好者来说 xff0c 能自己从头开始组装一台无人机 xff0c 之后加入 AI 算法 xff0c 能够航拍 xff0c 可以目标跟踪 xff0c 是心中的梦想 并且 xff0c 亲自从零开始完成复杂系统 xff0c
  • B 站硬件大佬又在 GitHub 上开源了一款神器...

    公众号关注 GitHubDaily 设为 星标 xff0c 每天带你逛 GitHub xff01 转自量子位 这次 xff0c 野生钢铁侠稚晖君带着他的硬核项目又来了 上次自制纯手工打造 AI 小电视 xff0c 播放量就超过 300 万
  • 用 C 语言来刷 LeetCode,网友直呼:那是真的牛批...

    公众号关注 GitHubDaily 设为 星标 xff0c 每天带你逛 GitHub xff01 大家好 xff0c 我是小 G 如果你是计算机科班出身 xff0c 那么 C 语言 xff0c 估计是你在初入编程时 xff0c 最早接触的编
  • 【pytorch torchvision源码解读系列—3】Inception V3

    框架中有一个非常重要且好用的包 xff1a torchvision xff0c 顾名思义这个包主要是关于计算机视觉cv的 这个包主要由3个子包组成 xff0c 分别是 xff1a torchvision datasets torchvisi
  • 【pytorch torchvision源码解读系列—5】DenseNet

    pytorch框架中有一个非常重要且好用的包 xff1a torchvision xff0c 顾名思义这个包主要是关于计算机视觉cv的 这个包主要由3个子包组成 xff0c 分别是 xff1a torchvision datasets to
  • Eclipse使用JDBC方式连接SQLServer2016

    Eclipse使用JDBC方式连接SQLServer2016 今天下午在查找很多JDBC连接SQL时发现大多数都是2012甚至更久以前的版本 xff0c 所以就此把步骤记录下来 xff0c 以免自己下次使用又忘记了 在连接的时候 xff0c
  • 魔改《自动化学报》Latex模板

    想用latex写一个中文文档 xff0c 看上了 自动化学报 的模板 xff0c 感觉不错 xff0c 下载下来在本地的tex live上编译 xff0c 报了一大串错 xff1b 上传到overleaf xff0c 还是报错 xff1b
  • TX2安装jetpack

    目前官网支持的下载为JetPack L4T 3 2 1 linux x64 b23和JetPack L4T 3 3 linux x64 b39 首先使用具有Ubuntu16 04的host主机 xff08 我使用的是个人笔记本 xff0c
  • TF-IDF算法

    TF IDF算法 TF IDF term frequency inverse document frequency 是一种用于信息检索与数据挖掘的常用加权技术 xff0c 常用于挖掘文章中的关键词 xff0c 而且算法简单高效 xff0c
  • 大数据009——MapReduce

    分布式离线计算框架MapReduce MapReduce是一种编程模型 Hadoop MapReduce采用Master slave 结构 只要按照其编程规范 xff0c 只需要编写少量的业务逻辑代码即可实现一个强大的海量数据并发处理程序
  • MapReduce实例——wordcount(单词统计)

    1 MR实例开发整体流程 最简单的MapReduce应用程序至少包含 3 个部分 xff1a 一个 Map 函数 一个 Reduce 函数和一个 main 函数 在运行一个mapreduce计算任务时候 xff0c 任务过程被分为两个阶段
  • MapReduce实例——好友推荐

    1 实例介绍 好友推荐算法在实际的社交环境中应用较多 xff0c 比如qq软件中的 你可能认识的好友 或者是Facebook中的好友推介 好友推荐功能简单的说是这样一个需求 xff0c 预测某两个人是否认识 xff0c 并推荐为好友 xff
  • Hadoop源码分析——JobClient

    1 MapReduce作业处理过程概述 当用户使用Hadoop的Mapreduce计算模型来进行处理问题时 xff0c 用户只需要定义所需的Mapper和Reduce处理函数 xff0c 还有可能包括的Combiner Comparator
  • 大数据010——Hive

    1 Hive 概述 Hive 是建立在 Hadoop 上的数据仓库基础构架 它提供了一系列的工具 xff0c 可以用来进行数据提取转化加载 xff08 ETL xff09 xff0c 这是一种可以存储 查询和分析存储在 Hadoop 中的大
  • 大数据011——Sqoop

    1 Sqoop 概述 Sqoop是Hadoop和关系数据库服务器之间传送数据的一种工具 它是用来从关系数据库如 xff1a MySQL xff0c Oracle到Hadoop的HDFS xff0c 并从Hadoop的文件系统导出数据到关系数
  • 大数据012——HBase

    1 HBase 简介 HBase Hadoop Database xff0c 是一个高可靠性 高性能 面向列 可伸缩 实时读写的分布式数据库 xff1b 在Hadoop生态圈中 xff0c 它是其中一部分且利用Hadoop HDFS作为其文
  • Hadoop源码分析——MapReduce输入和输出

    Hadoop中的MapReduce库支持集中不同的格式的输入数据 例如 xff0c 文本模式的输入数据的每一行被视为一个key value键值对 key是文件的偏移量 xff0c value是那一行的内容 另一种常见的格式是以key进行排序
  • 大数据013——Flume

    1 Flume 简介 Flume是由Cloudera软件公司提供的一个高可用的 xff0c 高可靠的 xff0c 分布式的海量日志采集 聚合和传输的系统 xff0c 后与2009年被捐赠了apache软件基金会 xff0c 为hadoop相
  • Hadoop源码分析——计算模型MapReduce

    MapReduce 是一个计算模型 xff0c 也是一个处理和生成超大数据集的算法模型的相关实现 用户首先创建一个Map函数处理一个基于key value pair的数据集合 xff0c 输出中间的基于 key value pair 的数据