Mapreduce的简单实现和步骤

2023-05-16

package com.qfedu.bigdata.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**

  • @ClassName wordCount
  • @Description TODO
  • @Author Chenfg
  • @Date 2018/9/20 0020 11:22
  • @Version 1.0
  • 词频统计
  • 输出数据:
  • word,n
  • Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  • 框架在调用咱们写的map方法时,会将数据作为参数(一个key,一个value)传递给map方法
    
  • KEYIN:是框架(maptask)要传递给map方法的输入参数中的key的数据类型
    
  • VALUEIN:是框架(maptask)要产地给map方法的输入参数中的value的数据类型
    
  • 在默认情况下,框架传入的key是框架从待处理数据(文本文件)中读取到的‘某一行’数据的起始偏移量,所以类型Long
    
  • 框架传入的value是框架从待处理数据中读到的‘某一行’的内容,所以类型是String
    
  • 但是,Long或者String等java的原生态数据类型的序列化的效率较低,所以hadoop对其进行了封装改造,
    
  • 有替代品:LongWriable/Text
    
  • map方法处理完数据后需要返回一个结果(一个key一个value的键值对数据)
    
  • KEYOUT:是咱们的map方法处理完成后返回结果中的key的数据类型
    
  • VALUEOUT:是咱们的map方法处理完成后返回结果中的value的数据类型
    
  • 思路:
    
  • map阶段的处理逻辑:
    
  • 1、用空格切分单词
    
  • 2、循环遍历单词
    
  • 3、输出结果,<word,1>
    
  •     map方法的调用规律:
    
  •     maptask没读取到一行数据就会调用此意map方法
    

*/
public class wordCount {
static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、数据类型转换
String line = value.toString();

        //2、切分数据
        String[] words = line.split(" ");

        //3、循环遍历单词
        for (String word : words) {
            //4、输出结果
            context.write(new Text(word),new LongWritable(1));
        }
    }
}

/**
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     reduce()方法要接收的输入参数是一个key一个迭代器<T>的values
 *     KEYIN:框架(reducetask)要传递给reduce方法的输入参数的key的数据类型
 *     VALUEIN:框架(reducetask)要传递给reduce方法的输入参数的value的数据类型
 *
 *
 *      KEYOUT:reduce方法处理后的数据的返回结果的key的数据类型
 *      VALUEOUT:reduce方法处理后的数据的返回结果的value的数据类型
 *
 *     reduce方法调用的规律:框架会从map阶段的输出结果中找出所有的key相同的<k,v>数据对组成一组数据,
 *     然后调用一次reduce()方法
 */
static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //为什么用迭代器
        /**
         * map端的输出数据:
         <a,1>
         <b,1>
         <c,1>
         <a,1>
         <a,1>
         <d,1>

         reduce端的输入数据:
         <a,<1,1,1>>
         */

        int count=0;
        //循环遍历迭代器
        for (LongWritable i :values) {
            count+=i.get();
        }

        //往下发送数据
        context.write(key,new LongWritable(count));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //1、创建一个Configuration配置项
    Configuration conf = new Configuration();

    //2、配置连接参数
    conf.set("fs.defaultFS","hdfs://qianfeng");

    //高可用集群的设置项
    conf.set("fs.defaultFS","hdfs://qianfeng");
    conf.set("dfs.nameservices","qianfeng");
    conf.set("dfs.ha.namenodes.qianfeng","nn1,nn2");
    conf.set("dfs.namenode.rpc-address.qianfeng.nn1","hadoop01:9000");
    conf.set("dfs.namenode.rpc-address.qianfeng.nn2","hadoop02:9000");
    conf.set("dfs.client.failover.proxy.provider.qianfeng","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

    //3、创建一个job对象
    Job job = Job.getInstance(conf,"wordCount");

    //4、描述对象
    //5、设置Job的执行路径
    job.setJarByClass(wordCount.class);

    //6、设置mapTask调用的业务逻辑类
    job.setMapperClass(WCMapper.class);

    //7、设置map端数据输出的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    //8、设置mapTask调用的业务类
    job.setReducerClass(WCReducer.class);

    //9、设置reduce端的数据的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    //10、设置Job的输入文件的路径
    FileInputFormat.setInputPaths(job,new Path(args[0]));

    //11、设置Job的输出文件的路径
    FileOutputFormat.setOutputPath(job,new Path(args[1]));

    //12、提交job

// job.submit();
boolean b = job.waitForCompletion(true);

    System.exit(b?0:1);
}package com.qfedu.bigdata.MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**

  • @ClassName wordCount
  • @Description TODO
  • @Author Chenfg
  • @Date 2018/9/20 0020 11:22
  • @Version 1.0
  • 词频统计
  • 输出数据:
  • word,n
  • Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  • 框架在调用咱们写的map方法时,会将数据作为参数(一个key,一个value)传递给map方法
    
  • KEYIN:是框架(maptask)要传递给map方法的输入参数中的key的数据类型
    
  • VALUEIN:是框架(maptask)要产地给map方法的输入参数中的value的数据类型
    
  • 在默认情况下,框架传入的key是框架从待处理数据(文本文件)中读取到的‘某一行’数据的起始偏移量,所以类型Long
    
  • 框架传入的value是框架从待处理数据中读到的‘某一行’的内容,所以类型是String
    
  • 但是,Long或者String等java的原生态数据类型的序列化的效率较低,所以hadoop对其进行了封装改造,
    
  • 有替代品:LongWriable/Text
    
  • map方法处理完数据后需要返回一个结果(一个key一个value的键值对数据)
    
  • KEYOUT:是咱们的map方法处理完成后返回结果中的key的数据类型
    
  • VALUEOUT:是咱们的map方法处理完成后返回结果中的value的数据类型
    
  • 思路:
    
  • map阶段的处理逻辑:
    
  • 1、用空格切分单词
    
  • 2、循环遍历单词
    
  • 3、输出结果,<word,1>
    
  •     map方法的调用规律:
    
  •     maptask没读取到一行数据就会调用此意map方法
    

*/
public class wordCount {
static class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、数据类型转换
String line = value.toString();

        //2、切分数据
        String[] words = line.split(" ");

        //3、循环遍历单词
        for (String word : words) {
            //4、输出结果
            context.write(new Text(word),new LongWritable(1));
        }
    }
}

/**
 * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     reduce()方法要接收的输入参数是一个key一个迭代器<T>的values
 *     KEYIN:框架(reducetask)要传递给reduce方法的输入参数的key的数据类型
 *     VALUEIN:框架(reducetask)要传递给reduce方法的输入参数的value的数据类型
 *
 *
 *      KEYOUT:reduce方法处理后的数据的返回结果的key的数据类型
 *      VALUEOUT:reduce方法处理后的数据的返回结果的value的数据类型
 *
 *     reduce方法调用的规律:框架会从map阶段的输出结果中找出所有的key相同的<k,v>数据对组成一组数据,
 *     然后调用一次reduce()方法
 */
static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //为什么用迭代器
        /**
         * map端的输出数据:
         <a,1>
         <b,1>
         <c,1>
         <a,1>
         <a,1>
         <d,1>

         reduce端的输入数据:
         <a,<1,1,1>>
         */

        int count=0;
        //循环遍历迭代器
        for (LongWritable i :values) {
            count+=i.get();
        }

        //往下发送数据
        context.write(key,new LongWritable(count));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //1、创建一个Configuration配置项
    Configuration conf = new Configuration();

    //2、配置连接参数
    conf.set("fs.defaultFS","hdfs://qianfeng");

    //高可用集群的设置项
    conf.set("fs.defaultFS","hdfs://qianfeng");
    conf.set("dfs.nameservices","qianfeng");
    conf.set("dfs.ha.namenodes.qianfeng","nn1,nn2");
    conf.set("dfs.namenode.rpc-address.qianfeng.nn1","hadoop01:9000");
    conf.set("dfs.namenode.rpc-address.qianfeng.nn2","hadoop02:9000");
    conf.set("dfs.client.failover.proxy.provider.qianfeng","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

    //3、创建一个job对象
    Job job = Job.getInstance(conf,"wordCount");

    //4、描述对象
    //5、设置Job的执行路径
    job.setJarByClass(wordCount.class);

    //6、设置mapTask调用的业务逻辑类
    job.setMapperClass(WCMapper.class);

    //7、设置map端数据输出的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    //8、设置mapTask调用的业务类
    job.setReducerClass(WCReducer.class);

    //9、设置reduce端的数据的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    //10、设置Job的输入文件的路径
    FileInputFormat.setInputPaths(job,new Path(args[0]));

    //11、设置Job的输出文件的路径
    FileOutputFormat.setOutputPath(job,new Path(args[1]));

    //12、提交job

// job.submit();
boolean b = job.waitForCompletion(true);

    System.exit(b?0:1);
}

}

}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Mapreduce的简单实现和步骤 的相关文章

随机推荐