MapReduce之Map阶段

2023-05-16

MapReduce阶段分为map,shuffle,reduce。

map进行数据的映射,就是数据结构的转换,shuffle是一种内存缓冲,同时对map后的数据分区、排序。reduce则是最后的聚合。

此文探讨map阶段的主要工作。

map的工作

      • 代码介绍
      • split
      • 启动mapTask

代码介绍

我们还是准备word count的代码:

maper

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    Text mapOutKey = new Text();
    LongWritable mapOutValue = new LongWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        String[] words = line.split(" ");

        for (String word : words) {
            mapOutKey.set(word);
            context.write(mapOutKey, mapOutValue);
        }
    }
}

reducer

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    LongWritable reduceOutValue = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        Long sum = 0L;

        for (LongWritable value : values) {
            sum += value.get();
        }
        reduceOutValue.set(sum);

        context.write(key, reduceOutValue);
    }
}

partitioner

public class WordCountPartitioner extends Partitioner<Text, LongWritable> {
    @Override
    public int getPartition(Text text, LongWritable longWritable, int numPartitions) {

        int partition = 0;

        String word = text.toString();

        if("I".equalsIgnoreCase(word)){
            partition = 1;
        }

        return partition;
    }
}

driver

public class WordCountDriver implements Tool {

    private Configuration configuration = null;

    static {
        try {
            // 设置 HADOOP_HOME 目录
            System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-3.1.0");
            // 加载库文件
            System.load("D:\\hadoop\\hadoop-3.1.0\\bin\\hadoop.dll");
        } catch (UnsatisfiedLinkError e) {
            System.err.println("Native code library failed to load.\n" + e);
            System.exit(1);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.configuration);

        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setPartitionerClass(WordCountPartitioner.class);
        job.setNumReduceTasks(2);

        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\blogInput"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output\\blogOutput"));

        boolean result = job.waitForCompletion(true);
        return result ? 1 : 0;
    }

    @Override
    public void setConf(Configuration conf) {
        configuration = conf;
    }

    @Override
    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) {
        try {
            ToolRunner.run(new Configuration(), new WordCountDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

input

I like java
I like scala
I like python
I hate hadoop

使用自定的partitioner是因为我们想看到分区的逻辑。

如果你没有自己的partitioner,一个分区的话(默认就是一个reduceTask和一个partition):

//        job.setPartitionerClass(WordCountPartitioner.class);
//        job.setNumReduceTasks(2);

框架给你自己new一个匿名类,返回的partition只有0,即一个分区:

在这里插入图片描述
如果你没有自己的partitioner,但是reduceTask的数量超过了一个,比如这样:

//        job.setPartitionerClass(WordCountPartitioner.class);
        job.setNumReduceTasks(2);

此时默认用的是HashPartitioner

在这里插入图片描述

split

hdfs中需要分块,那是实际的将文件切分存储到磁盘。

mapreduce的map会涉及到切片(split),那是一种逻辑的分割,就是代码层面的,每个切片开一个mapTask来处理。

首先明确我们的环境,因为使用的是本地运行,所以任务是提交到本地而非yarn:

在这里插入图片描述

由于是本地,所以也无需提交jar包,yarn的话就需要了:

在这里插入图片描述

接下来开始切片,它会将切片文件放在本地临时文件夹:

在这里插入图片描述

切片大小由三个值决定,块大小,一个最大值,一个最小值。

本地模式下块大小是32MB,因为他认为你本地资源不足。

最小值我这里是1,最大值是Long的最大值,你可以去配。

在这里插入图片描述

在这里插入图片描述
最后算出来切片大小就是块大小。

或者我们可以说,默认情况下切片大小就是块大小。

但这里又不是简单按照32MB进行切的:

在这里插入图片描述

SPLIT_SLOP是1.1,也就是说,如果文件大小是40MB,没问题,切成两片,一片32MB,一片8MB;但如果文件大小是32.1MB,它去除以32没有大于1.1,那就还是一片。

其实这就防止了小文件的产生,32.1MB还切成两片,给0.1MB开启一个mapTask,耗费1G的内存,任务启动时间大于任务计算时间,这是不合理的。

最后将切片信息写到本地:

在这里插入图片描述

我们再返回到map的处理流程:

在这里插入图片描述

从这里也可以看到切片的数量影响mapTask的数量。

接着我们将提交配置信息。

在这里插入图片描述

启动mapTask

当我们new job的时候调用

在这里插入图片描述

一个任务就开启了。

在这里插入图片描述
如果没有reduce阶段,那么直接map即可。

如果有reduce阶段,那么先map再sort。

在这里插入图片描述
这个方法将真正启动map。

首先,我们需要一个采集器,接受map端输出的数据:

在这里插入图片描述

初始化采集器时,我们将注意到几个重要的参数,它们将构建后来的缓冲区和排序和溢写:

在这里插入图片描述

缓冲区的默认大小是100MB。

spillper0.8是溢写的阈值,就是内存到80MB了,那就把数据写到磁盘上,这样可以保证读取数据和写出数据都正常进行。

索引的缓存上限是1MB,如果索引在内存中大于1MB了,那就写到磁盘。

在这里插入图片描述

缓冲区的数据溢写到磁盘之前要进行排序,排序的算法是快排。

缓冲区叫做kvbuffer,100MB。

存索引的叫kvmeta,这是一个IntBuffer,它的大小其实也是100MB,只不过1的int占4个字节,所以它的capacity就是100MB/4。

bufstart = bufend = bufindex = equator;都是0,这是和真实数据相关的。

kvstart = kvend = kvindex;都是26214396,它是存元数据的kvmeta的一个位置,最后将不断往前跳4个位置来存储元数据。

bufferRemaining默认就是80MB(图上没显示)。

初始化采集器的时候同时溢写线程也开启了:

在这里插入图片描述

map的初始化工作算是完成了。接着就要走进我们程序的map处理逻辑了:

在这里插入图片描述

在这里插入图片描述

读取的第一行是I like java

我们将key为I,value为1的数据往外写,写到哪里去呢?就是初始化好的缓冲区和存元数据的IntBuffer。我们把下个阶段叫shuffle。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MapReduce之Map阶段 的相关文章

  • 如何在有或没有 Pig 的情况下使用 Cassandra 的 Map Reduce?

    有人可以解释 MapReduce 如何与 Cassandra 6 配合使用吗 我已经阅读了字数统计示例 但我不太明白 Cassandra 端与 客户端 端发生的情况 https svn apache org repos asf cassan
  • MongoDB 根据 _id 统计每分钟新文档数

    我想创建每分钟存储多少新文档的统计数据 由于具有标准 ObjectID 的 id 字段已经包含文档创建的时间戳 我认为应该可以以某种方式使用它 在 Stackoverflow 上 我发现了以下映射归约代码 可以在有用于创建数据的专用字段时完
  • security.UserGroupInformation:MR 的 PrivilegedgedActionException 错误

    每当我尝试执行映射缩减作业以写入 Hbase 表时 我都会在控制台中收到以下错误 我正在从用户帐户运行 MR 作业 错误 security UserGroupInformation PriviledgedActionException 为
  • 在mongo中执行优先级查询

    样本文件 name John age 35 address join month 3 的员工优先级为 1 地址包含字符串 Avenue 的员工优先级为 2 地址包含字符串 Street 的员工优先级为 3 地址包含字符串 Road 的员工优
  • 仅使用一个映射器的 Hadoop gzip 输入文件[重复]

    这个问题在这里已经有答案了 可能的重复 为什么 hadoop 不能分割一个大文本文件 然后使用 gzip 压缩分割的内容 https stackoverflow com questions 6511255 why cant hadoop s
  • JA017:无法查找已启动的 hadoop 作业 ID

    当我在Hue的Oozie编辑器中提交mapreduce作业时 如何解决这个问题 JA017 无法查找与操作 0000009 150711083342968 oozie root W mapreduce f660 关联的已启动 hadoop
  • 如何具体确定MRJob中每个map步骤的输入?

    我正在从事一项地图缩减工作 包含多个步骤 使用 mrjob 每个步骤都会接收上一步的输出 问题是我不想这样 我想要的是提取一些信息并在第二步中针对所有输入等使用它 可以使用 mrjob 来做到这一点吗 Note 因为我不想使用emr 这个问
  • 使用 MultipleOutputs 写入 MapReduce 中的 HBase

    我目前有一个 MapReduce 作业 它使用 MultipleOutputs 将数据发送到多个 HDFS 位置 完成后 我使用 HBase 客户端调用 在 MR 之外 将一些相同的元素添加到一些 HBase 表中 使用 TableOutp
  • FAILED 错误:java.io.IOException:所有收集器的初始化失败

    我在运行 MapReduce WordCount 作业时遇到一些错误 错误 java io IOException 所有收集器的初始化 失败的 最后一个收集器中的错误是 class wordcount wordmapper at org a
  • 如何处理 YARN MapReduce 作业的容器故障?

    YARN 中如何处理软件 硬件故障 具体来说 如果容器发生故障 崩溃 会发生什么 容器和任务失败由节点管理器处理 当容器失败或死亡时 节点管理器会检测到失败事件并启动一个新容器来替换失败的容器并在新容器中重新启动任务执行 如果应用程序主机发
  • 如何使用新的 Hadoop API 来使用 MultipleTextOutputFormat?

    我想编写多个输出文件 如何使用 Job 而不是 JobConf 来执行此操作 创建基于密钥的输出文件名的简单方法 input data type key value cupertino apple sunnyvale banana cupe
  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • mongodb 聚合随机化(shuffle)结果

    我正在浏览一堆 mongo 文档 但找不到洗牌或随机化结果内容的可能性 有没有 特别是对于聚合框架本身来说 实际上并没有任何本地方法 因为还没有可用的运算符来执行诸如生成随机数之类的操作 因此 无论您可能投射一个字段进行排序的任何匹配 都不
  • MongoDB:在没有并行性的情况下使用 MapReduce 有什么意义?

    Quoting http www mongodb org display DOCS MapReduce MapReduce Parallelism http www mongodb org display DOCS MapReduce Ma
  • Python - Map/Reduce - 如何在使用 DISCO 计数单词示例中读取 JSON 特定字段

    我正在按照 DISCO 示例来计算文件中的单词数 将单词数作为 Map Reduce 作业 http discoproject org doc disco start tutorial html 我对此工作没有任何问题 但是我想尝试从包含
  • 遍历 ArrayWritable - NoSuchMethodException

    我刚刚开始使用 MapReduce 并且遇到了一个奇怪的错误 我无法通过 Google 回答该错误 我正在使用 ArrayWritable 制作一个基本程序 但是当我运行它时 在Reduce过程中出现以下错误 java lang Runti
  • Riak 在 MapReduce 查询中失败。使用哪种配置?

    我正在与 riak riak js 结合开发一个 nodejs 应用程序 并遇到以下问题 运行此请求 db mapreduce add logs run 正确返回存储在存储桶日志中的所有 155 000 个项目及其 ID logs 1GXt
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测
  • Apache Spark 何时发生混洗?

    我正在优化 Spark 中的参数 并且想确切地了解 Spark 是如何对数据进行洗牌的 准确地说 我有一个简单的字数统计程序 并且想知道spark shuffle file buffer kb如何影响运行时间 现在 当我将此参数设置得非常高
  • MongoDB 存储过程等效项

    我有一个包含商店列表的大型 CSV 文件 其中一个字段是邮政编码 我有一个名为 ZipCodes 的独立 MongoDB 数据库 它存储任何给定邮政编码的纬度和经度 在 SQL Server 中 我将执行一个名为 InsertStore 的

随机推荐