Hadoop源码分析——MapReduce输入和输出

2023-05-16

Hadoop中的MapReduce库支持集中不同的格式的输入数据。例如,文本模式的输入数据的每一行被视为一个key/value键值对。key是文件的偏移量,value是那一行的内容。另一种常见的格式是以key进行排序来存储key/value键值对的序列。每种输入类型的实现都必须能够把数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理。

1. 输入格式-InputFormat

当运行一个MapReduce作业的时候,我们需要为作业制定它的输入格式。InputFormat作为Hadoop作业的所有数入格式的抽象基类,它描述了作业的输入需要满足的规范细节。

1.1 InputFormate抽象类

InputFormat所在的包为org.apache.hadoop.mapredce,在该抽象类中定义了连个抽象类:

/**
 * 该方法的主要作用是将所有的输入文件分割成逻辑上的多个分片InputSplit,每个InputSplit通过输入文件路  	* 径、开始位置和偏移量三个信息进行来唯一标识。
 */
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
/**
 * 该方法的主要作用是为制定的InputSplit创建记录读取器 	
 */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, nterruptedException;

InputFormat有三个直接子类:FIleInputFormat、DBInputFormat、DelegatingInputFormat。而FileInputFormat又包括TextInputFormat、KeyValueInputFormat、CombineFileInputFormat、NineInputFormat和SequenceFIleInputFormat五个子类。

1.2 FIleInputFormat文件输入格式

文件是Hadoop作业最为常用的格式,FIleInputFormat可以根据文件的总大小来将输入文件分割成若干个输入切片。为了保证整个记录不被截断,我们必须采取相应的处理策略,如通过创建RecordReader来保证记录的完整性,从而为Map提供一个面向记录的逻辑分块的试图。

1.2.1 成员属性

  public static final String INPUT_DIR = 
    "mapreduce.input.fileinputformat.inputdir";//逗号分隔输入路径列表
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";//输入切片的最大size
  public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";//输入切片的最小size
  public static final String PATHFILTER_CLASS = 
    "mapreduce.input.pathFilter.class";//输入文件的过滤器类,只有通过过滤器的文件才会加入
  public static final String NUM_INPUT_FILES =
    "mapreduce.input.fileinputformat.numinputfiles";
  public static final String INPUT_DIR_RECURSIVE =
    "mapreduce.input.fileinputformat.input.dir.recursive";
  public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";
  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
  private static final double SPLIT_SLOP = 1.1;   // 10% slop,切片容量溢出阈值

1.2.2 成员方法

/**
 *获取输入路径上所有文件的操作,每个文件使用FileStatus对象代替,如果输入路径为空或未满足过滤条件将报错
 */
protected List<FileStatus> listStatus(JobContext job) throws IOException
/**
 * 生成输入文件的输入切片的方法,生成的输入切片是FileSplit格式的。
 */  
public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    //首先取得输入切片的上界和下界
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);
    // generate splits,初始化用于保存生成的输入切片的列表对象
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //取得所有的输入文件列表
    List<FileStatus> files = listStatus(job);
    //对于文件列表中的每一个文件进行相应的分割处理,然后生成文件的输入切片列表
    for (FileStatus file: files) {
      Path path = file.getPath();//取得文件的Path对象
      long length = file.getLen();//取得文件的长度
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {//判断是否为本地的文
            //获取本地文件的所有块信息
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
            //取得文件所在的文件系统
          FileSystem fs = path.getFileSystem(job.getConfiguration());
            //获取文件所在的文件系统的相应块信息
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {//判断文件是否可以被切割
          long blockSize = file.getBlockSize();//取出文件块的大小
            //将文件系统数据块的大小、输入切片的上下界作为参数传递给computeSplitSize方法来计算出真正的输入切边的大小,计算策略为:首先取出块大小和设置的切片大小上界中的较小值,然后在取出上一步计算出的较小值和设置的切片大小的下界的较大值,最终第二部取出的较大值作为实际切片的大小。
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
			//剩余文件大小的初始值为整个切片的大小
          long bytesRemaining = length;
            //如果文件未切割部分大小比切片daxiao的1.1倍还要大,那么就创建一个FileSplit切片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              //首先取得当前切片所在数据块的索引
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              //新建FileSplit,并添加到输入切片列表
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
              //将剩余文件的大小剪去切片大小,并将返回值作为新的剩余文件的大小
            bytesRemaining -= splitSize;
          }
			//当剩余文件大小比切片大小1.1倍小时,将剩余部分作为整个FileSplit切片处理
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable,如果文件是不可切割的,那么将整个文件作为一个FileSplit
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    //如果输入文件是可分割的,但是文件大小为0,那么创建一个默认的FileSplit切片
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }

1.3 TextInputFormat文本文件输入格式

TextInputFormat是FIleInputFormat抽象类的默认实现。该输入格式主要针对的是文本类型的文件,文件被分割成许多的行,而且每一行使用换行符后者【Enter】键作为每一行结束标识。该类主要重写了createRecordReader和isSplitable方法:

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
      //创建LineRecordReader行记录读取器,该读取器用于从文件中读取一行
    String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
      //根据是否能够找到与压缩文件对于的编码器来决定是否对输入文件进行分割处理
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

1.4 KeyValueTextInputFormat键值对文本输入格式

该类是与TextInputFormat具有类似性质的针对文本类型文件,文件被分割成许多的行,而且每一行使用换行符后者【Enter】键作为每一行结束标识。只不过KeyValueTextInputFormat所对应的文件中的每一行是由一个特殊的分割符所割成的键值对。

1.5 CombineFileInputFormat组合文件输入格式

1.6 SequenceFileInputFormat序列文件输入格式

2. 输入切片-InputSplit

输入切片InputSplit是一个单独的Map要处理的数据单元。输入切片的数据类型一般都是字节类型。输入切片经过相应的RecordReader处理之后,转化成记录视图的形式,然后交给Map处理。一般一条记录一一个键值对的形式来展现。

InputSplitt所在的包为org.apache.hadoop.mapredce,该抽象类定义了两个抽象方法:

//取得输入切片的大小 
public abstract long getLength() throws IOException, InterruptedException;
//取得保存该输入切片的数据节点的列表
public abstract String[] getLocations() throws IOException, InterruptedException;

InputSplit的实现类包括:FileSplit、CombineFielSplit和DBInputFormat.DBInputSplit。

2.1 FileSplit文件输入切片

FileSplit是默认的InputSplit,这一点可以从FileInputFormat的创建输入切片的方法中体现出来。

2.1.1 成员变量

  private Path file;//该输入切片所在的文件
  private long start;//该输入切片在文件中的起始位置
  private long length;//该输入切片的大小
  private String[] hosts;//保存该输入切片的主机列表
  private SplitLocationInfo[] hostInfos;//保存该输入切片的数据节点列表

2.1.2 成员方法

该类主要的成员方法就是上面成员变量所对应的get方法,以及Wrtiable接口的实现方法。FileSplit只对它所在的文件、起始位置、和切片大小属性进行序列化:

  @Override
  public void write(DataOutput out) throws IOException {
    Text.writeString(out, file.toString());
    out.writeLong(start);
    out.writeLong(length);
  }

同理,FileSplit也只对它所在的文件、起始位置、和切片大小属性进行反序列化,主机列表属性会被默认为0:

  @Override
  public void readFields(DataInput in) throws IOException {
    file = new Path(Text.readString(in));
    start = in.readLong();
    length = in.readLong();
    hosts = null;
  }

2.2 CombineFileSplit多文件输入切片

CombineFielSplit是与前面介绍的CombineFielInputFormat输入格式相对应的输入切片类型。FileSpilt代表一个文件的一个输入切片,而CombinFileSplit切片代表将来自多个文件的输入切片的一个输入切片。虽然每个CombineFIleSplit切片一般会包含来自不同文件的数据块,但是在同一个切片中的所有数据块一般都是在同一个机架上的。

2.2.1 成员变量

  private Path[] paths;//该输入切片所在的文件
  private long[] startoffset;//每个子切片在对应得文件中起始位置
  private long[] lengths;//该输入切片的大小
  private String[] locations;//每个子切片所在的机器名
  private long totLength;//所有子切片的长度纸盒

2.2.2 成员方法

该类主要的成员方法就是上面成员变量所对应的get方法,以及Writable接口的实现方法。

3. 记录读取器-RecordReader

FileInputFormat的一些子类中都实现了createRecordReader方法,并返回了用于处理输入切片的RecordReader。该类包含6个抽象方法:

 public abstract void initialize(InputSplit split,
                                  TaskAttemptContext context
                                  ) throws IOException, InterruptedException;
//初始化方法,该方法只会在初始化执行一次,输入参数包括输入切片和任务尝试上下文。
  public abstract boolean nextKeyValue() throws IOException, InterruptedException;
//取得输入切片的下一个键值对,如果读取到键值对该方法返回ture,否则返回false。
  public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
//取得当前读取到的键值对的键对象。
  public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//取得当前读取到的键值对的值对象。
  public abstract float getProgress() throws IOException, InterruptedException;
//取得当前数据读取的进度,该方法会返回0.0到1.0之间的浮点数。
 public abstract void close() throws IOException;
//该方法是java.io的Closeable接口中的方法,用于关闭RecordReader,节约资源。

4. 输出格式-OutputFormat

4.1 OutputFormat抽象类

OutputFormat抽象类描述了MapReduce作业的输出规范,它决定了MapReduce的作业输出结果保存到哪里,以及如何对输出结果进行持久化操作。主要工作有:

  • 检查作业的输出是否有效,比如检查输出目录是否存在;
  • 提供一个具体的RecordWriter实现类。Hadoop依靠该实现类将MapReduce作业的处理结果保存到制定文件系统的文件中,一般写到HDFS文件系统中。
  public abstract void checkOutputSpecs(JobContext context
                                        ) throws IOException, 
                                                 InterruptedException;
//检查作业输出目录是否存在。
  public abstract RecordWriter<K, V> 
    getRecordWriter(TaskAttemptContext context
                    ) throws IOException, InterruptedException;
//返回RecordWriter,用来将产生的键值进行输出。
  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context
                                     ) throws IOException, InterruptedException;
//返回OutputCommitter,用来将提交输出结果。

4.2 FileOutputFormat 文件输出格式

该类在工作的过程中利用大量配置对象中所包含的配置项,如是否压缩等。

4.2.1 枚举类

  public static enum Counter {
    BYTES_WRITTEN
  }
//该枚举类型定义了一个定时器,它记录了输出的字节数。

4.2.2 成员变量

 //用于输出文件名的数字部分,对处理结果进行排序输出
  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
 //输出文件的基名称相对应得配置项,如类似名称part-00xx中的part部分
  protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
  //默认输出文件的基名称为“part”
  protected static final String PART = "part";
  //对文件类型的输出进行提交的提交器
  private FileOutputCommitter committer = null; 

4.2.3 静态代码块

//用于设置上面定义的NUMBER_FORMAT变量的属性,如设置数值的整数部分最少为5位吗,不够5位的用0填充;  //setGroupingUsed(false)设置数字部分不进行分组操作,如12345不会被格式化为12,345
static {
    NUMBER_FORMAT.setMinimumIntegerDigits(5);
    NUMBER_FORMAT.setGroupingUsed(false);
  }

4.2.4 成员方法

//通过配置项决定输出是否进行压缩操作  
public static void setCompressOutput(Job job, boolean compress) {
    job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
  }
//通过配置项决定采用的压缩器
  public static boolean getCompressOutput(JobContext job) {
    return job.getConfiguration().getBoolean(
      FileOutputFormat.COMPRESS, false);
  }
//通过配置项决定输出目录
public static void setOutputPath(Job job, Path outputDir) {
    ...
}
//生成一个唯一的输出文件名,输出格式为$name-[mr]-$taskid$extension,其中那么就是BASE_OUPUT_NAME
  public synchronized static String getUniqueFile(TaskAttemptContext context,
                                                  String name,
                                                  String extension) {
      ...
  }

4.3 TextOutFormat 文本格式的文件输出格式

4.4 SequenceFileOutputFormat 普通序列文件输出格式

4.5 SequenceFileAsBinaryOutputFormat 二进制序列文件输出格式

4.6 FilterOutputFormat 过滤器输出格式

4.6 DBOutputFormat 数据库输出格式

4.8 MultipleOutputs 多种输出格式

5. 记录写入器-RecordWriter

RecordWriter用于将MapReduce作业的键值对结果写入到制定的输出中。

  //将键值对以制定格式写入到输出目录
  public abstract void write(K key, V value
                             ) throws IOException, InterruptedException;
  //关闭输出,节约资源
  public abstract void close(TaskAttemptContext context
                             ) throws IOException, InterruptedException;

6. 输出提交器-OutputCommitter

OutputCommitter主要用于控制MapReduce作业的输出环境。主要一下工作:

  • 在OutputCommitter初始化时启动jon。比如会创建job的临时输出目录;
  • 在作业完成之后清除job申请的资源。比如会删除job的临时目录;
  • 为Mapper或者Reducer任务创建临时的输出目录;
  • 检查Mapper或者Reducer任务是否需要提交;
  • 提交或者丢弃MapReduce任务的输出。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop源码分析——MapReduce输入和输出 的相关文章

随机推荐

  • 游戏平台SDK设计和开发之旅——XSDK功能点梳理

    做游戏开发或者相关工作的同学 xff0c 可能都知道 xff0c 在游戏上线之前 xff0c 需要将游戏分发到各大渠道平台 xff0c 比如九游 xff0c 百度 xff0c 360 xff0c 华为等等 其中和技术相关的事情 xff0c
  • 谈谈 GitHub 开放私有仓库一事的影响

    GitHub 此次宣布免费开放私有仓库 xff0c 在我看来有以下几点影响 xff1a 缓和与同类产品间的竞争压力小部分个人项目由开源转闭源微软在技术社区中的企业形象进一步强化为未来的企业服务预热 下面根据以上几点 xff0c 我来简单谈下
  • 每天坚持刷 LeetCode 的人,究竟会变得有多强... 学习技巧都藏在这几个公众号里面了......

    信息爆炸时代 xff0c 与其每天被各种看过就忘的内容占据时间 xff0c 不如看点真正对你有价值的信息 xff0c 下面小编为你推荐几个高价值的公众号 xff0c 它们提供的信息能真正提高你生活的质量 人工智能爱好者社区 专注人工智能 机
  • 超酷炫!智能无人机中文教程重磅上线!

    前 言 对于大多数无人机爱好者来说 xff0c 能自己从头开始组装一台无人机 xff0c 之后加入 AI 算法 xff0c 能够航拍 xff0c 可以目标跟踪 xff0c 是心中的梦想 并且 xff0c 亲自从零开始完成复杂系统 xff0c
  • B 站硬件大佬又在 GitHub 上开源了一款神器...

    公众号关注 GitHubDaily 设为 星标 xff0c 每天带你逛 GitHub xff01 转自量子位 这次 xff0c 野生钢铁侠稚晖君带着他的硬核项目又来了 上次自制纯手工打造 AI 小电视 xff0c 播放量就超过 300 万
  • 用 C 语言来刷 LeetCode,网友直呼:那是真的牛批...

    公众号关注 GitHubDaily 设为 星标 xff0c 每天带你逛 GitHub xff01 大家好 xff0c 我是小 G 如果你是计算机科班出身 xff0c 那么 C 语言 xff0c 估计是你在初入编程时 xff0c 最早接触的编
  • 【pytorch torchvision源码解读系列—3】Inception V3

    框架中有一个非常重要且好用的包 xff1a torchvision xff0c 顾名思义这个包主要是关于计算机视觉cv的 这个包主要由3个子包组成 xff0c 分别是 xff1a torchvision datasets torchvisi
  • 【pytorch torchvision源码解读系列—5】DenseNet

    pytorch框架中有一个非常重要且好用的包 xff1a torchvision xff0c 顾名思义这个包主要是关于计算机视觉cv的 这个包主要由3个子包组成 xff0c 分别是 xff1a torchvision datasets to
  • Eclipse使用JDBC方式连接SQLServer2016

    Eclipse使用JDBC方式连接SQLServer2016 今天下午在查找很多JDBC连接SQL时发现大多数都是2012甚至更久以前的版本 xff0c 所以就此把步骤记录下来 xff0c 以免自己下次使用又忘记了 在连接的时候 xff0c
  • 魔改《自动化学报》Latex模板

    想用latex写一个中文文档 xff0c 看上了 自动化学报 的模板 xff0c 感觉不错 xff0c 下载下来在本地的tex live上编译 xff0c 报了一大串错 xff1b 上传到overleaf xff0c 还是报错 xff1b
  • TX2安装jetpack

    目前官网支持的下载为JetPack L4T 3 2 1 linux x64 b23和JetPack L4T 3 3 linux x64 b39 首先使用具有Ubuntu16 04的host主机 xff08 我使用的是个人笔记本 xff0c
  • TF-IDF算法

    TF IDF算法 TF IDF term frequency inverse document frequency 是一种用于信息检索与数据挖掘的常用加权技术 xff0c 常用于挖掘文章中的关键词 xff0c 而且算法简单高效 xff0c
  • 大数据009——MapReduce

    分布式离线计算框架MapReduce MapReduce是一种编程模型 Hadoop MapReduce采用Master slave 结构 只要按照其编程规范 xff0c 只需要编写少量的业务逻辑代码即可实现一个强大的海量数据并发处理程序
  • MapReduce实例——wordcount(单词统计)

    1 MR实例开发整体流程 最简单的MapReduce应用程序至少包含 3 个部分 xff1a 一个 Map 函数 一个 Reduce 函数和一个 main 函数 在运行一个mapreduce计算任务时候 xff0c 任务过程被分为两个阶段
  • MapReduce实例——好友推荐

    1 实例介绍 好友推荐算法在实际的社交环境中应用较多 xff0c 比如qq软件中的 你可能认识的好友 或者是Facebook中的好友推介 好友推荐功能简单的说是这样一个需求 xff0c 预测某两个人是否认识 xff0c 并推荐为好友 xff
  • Hadoop源码分析——JobClient

    1 MapReduce作业处理过程概述 当用户使用Hadoop的Mapreduce计算模型来进行处理问题时 xff0c 用户只需要定义所需的Mapper和Reduce处理函数 xff0c 还有可能包括的Combiner Comparator
  • 大数据010——Hive

    1 Hive 概述 Hive 是建立在 Hadoop 上的数据仓库基础构架 它提供了一系列的工具 xff0c 可以用来进行数据提取转化加载 xff08 ETL xff09 xff0c 这是一种可以存储 查询和分析存储在 Hadoop 中的大
  • 大数据011——Sqoop

    1 Sqoop 概述 Sqoop是Hadoop和关系数据库服务器之间传送数据的一种工具 它是用来从关系数据库如 xff1a MySQL xff0c Oracle到Hadoop的HDFS xff0c 并从Hadoop的文件系统导出数据到关系数
  • 大数据012——HBase

    1 HBase 简介 HBase Hadoop Database xff0c 是一个高可靠性 高性能 面向列 可伸缩 实时读写的分布式数据库 xff1b 在Hadoop生态圈中 xff0c 它是其中一部分且利用Hadoop HDFS作为其文
  • Hadoop源码分析——MapReduce输入和输出

    Hadoop中的MapReduce库支持集中不同的格式的输入数据 例如 xff0c 文本模式的输入数据的每一行被视为一个key value键值对 key是文件的偏移量 xff0c value是那一行的内容 另一种常见的格式是以key进行排序