编程实现MapReduce操作

2023-05-16

文章目录

  • 一、MapReduce的WordCount应用
  • 二、Partitioner 操作
  • 三.排序实现
  • 四.二次排序实现
  • 五、hadoop实现
  • 六、出现的问题与解决方案



提示:以下是本篇文章正文内容,下面案例可供参考

一、MapReduce的WordCount应用

1.创建maven工程

在这里插入图片描述
⒉.配置工件坐标
在这里插入图片描述
3. 配置pom依赖文件
在这里插入图片描述
4.导入Hadoop配置文件
在这里插入图片描述
5.导入所需要的包
在这里插入图片描述

6.编写map函数方法;
 public static class MyMapper extends Mapper<Object,Text,Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key,Text value,Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()){
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
7. 编写reduce函数的方法
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
8. main函数的调用创建job类
public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/zhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "WordCountApp");
        job.setJarByClass(WordCountApp.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        Path inputPath = new Path(INPUT_PATH);
        FileInputFormat.addInputPath(job, inputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path(OUTPUT_PATH);
        FileOutputFormat.setOutputPath(job, outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
wordcount代码总和
package mapreduce;
import mapreduce.PartitionerApp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
public class WordCountApp {
    public static class MyMapper extends Mapper<Object,Text,Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key,Text value,Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()){
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable result = new IntWritable();
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/zhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "WordCountApp");
        job.setJarByClass(WordCountApp.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        Path inputPath = new Path(INPUT_PATH);
        FileInputFormat.addInputPath(job, inputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path(OUTPUT_PATH);
        FileOutputFormat.setOutputPath(job, outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

二、Partitioner 操作

1.自定义Partitoner在 MapReduce 中的应用
  private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String[] s = value.toString().split("\t");
            context.write(new Text(s[0]),new IntWritable(Integer.parseInt(s[1])));
        }
    }
    private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> value, Context context)
                throws IOException, InterruptedException {
            int sum= 0;
            for (IntWritable val : value){
                sum += val.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
2.编写MyPartitioner方法
public static class MyPartitioner extends Partitioner<Text, IntWritable>{
        @Override
        public int getPartition(Text key, IntWritable value, int numPartitons){
            if (key.toString().equals("xiaomi"))
                return 0;
            if (key.toString().equals("huawei"))
                return 1;
            if (key.toString().equals("iphone7"))
                return 2;
            return 3;
        }
    }
3.编写main函数
public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/partitionerzhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputpartitionerzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "PartitionerApp");
        job.setJarByClass(PartitionerApp.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(4);
        job.setInputFormatClass(TextInputFormat.class);
        Path inputPath = new Path(INPUT_PATH);
        FileInputFormat.addInputPath(job, inputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path(OUTPUT_PATH);
        FileOutputFormat.setOutputPath(job, outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
Partitioner完整代码
package mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
import java.net.URI;
public class PartitionerApp {
    private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            String[] s = value.toString().split("\t");
            context.write(new Text(s[0]),new IntWritable(Integer.parseInt(s[1])));
        }
    }
    private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> value, Context context)
                throws IOException, InterruptedException {
            int sum= 0;
            for (IntWritable val : value){
                sum += val.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
    public static class MyPartitioner extends Partitioner<Text, IntWritable>{
        @Override
        public int getPartition(Text key, IntWritable value, int numPartitons){
            if (key.toString().equals("xiaomi"))
                return 0;
            if (key.toString().equals("huawei"))
                return 1;
            if (key.toString().equals("iphone7"))
                return 2;
            return 3;
        }
    }
    public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/partitionerzhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputpartitionerzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "PartitionerApp");
        job.setJarByClass(PartitionerApp.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(4);
        job.setInputFormatClass(TextInputFormat.class);
        Path inputPath = new Path(INPUT_PATH);
        FileInputFormat.addInputPath(job, inputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path(OUTPUT_PATH);
        FileOutputFormat.setOutputPath(job, outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

三.排序实现

1.使用MapReduce API实现排序
public static class MyMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
        private static IntWritable data = new IntWritable();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
    }
        public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
             private static IntWritable data = new IntWritable(1);
             public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)
                     throws IOException, InterruptedException {
                     for (IntWritable val : values){
                         context.write(data, key);
                         data = new IntWritable(data.get()+ 1);
                     }
        }
    }
2.编写main函数
public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/sortzhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputsortzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "SortApp");
        job.setJarByClass(SortApp.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
sort总体代码
package mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class SortApp {
    public static class MyMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
        private static IntWritable data = new IntWritable();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
    }
        public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
             private static IntWritable data = new IntWritable(1);
             public void reduce(IntWritable key,Iterable<IntWritable> values, Context context)
                     throws IOException, InterruptedException {
                     for (IntWritable val : values){
                         context.write(data, key);
                         data = new IntWritable(data.get()+ 1);
                     }
        }
    }
    public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/sortzhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputsortzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job = Job.getInstance(conf, "SortApp");
        job.setJarByClass(SortApp.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

四.二次排序实现

1.编写IntPair方法
public static class IntPair implements WritableComparable<IntPair> {
        private int first = 0;
        private int second = 0;
        public void set(int left, int right) {
            first = left;
            second = right;
        }
        public int getFirst() {
            return first;
        }
        public int getSecond() {
            return second;
        }
        @Override
        public void readFields(DataInput in) throws IOException{
            first = in.readInt();
            second = in.readInt();
        }
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(first);
            out.writeInt(second);
        }
        @Override
        public int hashCode() {
            return first+"".hashCode() + second+"".hashCode();
        }
        @Override
        public boolean equals(Object right){
            if (right instanceof IntPair){
                IntPair r = (IntPair) right;
                return r.first == first && r.second == second;
            }else {
                return  false;
            }
        }
        @Override
        public int compareTo(IntPair o){
            if (first != o.first){
                return first - o.first;
            } else if (second != o.second){
                return second - o.second;
            }else {
                return 0;
            }
        }
    }
2.编写secondsort
public static class MyMapper extends Mapper<LongWritable, Text, IntPair, IntWritable>{
        private final IntPair key = new IntPair();
        private final IntWritable value = new IntWritable();
        @Override
        public void map(LongWritable inKey, Text inValue,Context context)
                throws IOException, InterruptedException{
            StringTokenizer itr = new StringTokenizer(inValue.toString());
            int left = 0;
            int right = 0;
            if (itr.hasMoreTokens()){
                left = Integer.parseInt(itr.nextToken());
                if (itr.hasMoreTokens()) {
                    right = Integer.parseInt(itr.nextToken());
                }
                key.set(left, right);
                value.set(right);
                context.write(key, value);
            }
        }
    }
    public static class GroupingComparator implements RawComparator<IntPair> {
        @Override
        public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2){
            return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,b2,s2, Integer.SIZE/8);
        }
        @Override
        public int compare(IntPair o1, IntPair o2) {
            int first1 = o1.getFirst();
            int first2 = o2.getFirst();
            return first1 - first2;
        }
    }
    public static class MyReducer extends Reducer<IntPair, IntWritable, Text, IntWritable>{
        private static final Text SEPARATOR=new Text("-------------");
        private final Text first = new Text();
        @Override
        public void reduce(IntPair key, Iterable<IntWritable> values, Context context)
                throws IOException,InterruptedException {
            context.write(SEPARATOR, null);
            first.set(Integer.toString(key.getFirst()));
            for(IntWritable value: values){
                context.write(first, value);
            }
        }
    }
3.编写main函数
public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/secondsortzhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputsecondsortzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job =Job.getInstance(conf, "SecondarySortApp");
        job.setJarByClass(SecondarySortApp.class);
        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setGroupingComparatorClass(GroupingComparator.class);
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
二次排序完整代码
package mapreduce;
import org.apache.hadoop.io. WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySortApp {
    public static class IntPair implements WritableComparable<IntPair> {
        private int first = 0;
        private int second = 0;
        public void set(int left, int right) {
            first = left;
            second = right;
        }
        public int getFirst() {
            return first;
        }
        public int getSecond() {
            return second;
        }
        @Override
        public void readFields(DataInput in) throws IOException{
            first = in.readInt();
            second = in.readInt();
        }
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(first);
            out.writeInt(second);
        }
        @Override
        public int hashCode() {
            return first+"".hashCode() + second+"".hashCode();
        }
        @Override
        public boolean equals(Object right){
            if (right instanceof IntPair){
                IntPair r = (IntPair) right;
                return r.first == first && r.second == second;
            }else {
                return  false;
            }
        }
        @Override
        public int compareTo(IntPair o){
            if (first != o.first){
                return first - o.first;
            } else if (second != o.second){
                return second - o.second;
            }else {
                return 0;
            }
        }
    }
    public static class MyMapper extends Mapper<LongWritable, Text, IntPair, IntWritable>{
        private final IntPair key = new IntPair();
        private final IntWritable value = new IntWritable();
        @Override
        public void map(LongWritable inKey, Text inValue,Context context)
                throws IOException, InterruptedException{
            StringTokenizer itr = new StringTokenizer(inValue.toString());
            int left = 0;
            int right = 0;
            if (itr.hasMoreTokens()){
                left = Integer.parseInt(itr.nextToken());
                if (itr.hasMoreTokens()) {
                    right = Integer.parseInt(itr.nextToken());
                }
                key.set(left, right);
                value.set(right);
                context.write(key, value);
            }
        }
    }
    public static class GroupingComparator implements RawComparator<IntPair> {
        @Override
        public int compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2){
            return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,b2,s2, Integer.SIZE/8);
        }
        @Override
        public int compare(IntPair o1, IntPair o2) {
            int first1 = o1.getFirst();
            int first2 = o2.getFirst();
            return first1 - first2;
        }
    }
    public static class MyReducer extends Reducer<IntPair, IntWritable, Text, IntWritable>{
        private static final Text SEPARATOR=new Text("-------------");
        private final Text first = new Text();
        @Override
        public void reduce(IntPair key, Iterable<IntWritable> values, Context context)
                throws IOException,InterruptedException {
            context.write(SEPARATOR, null);
            first.set(Integer.toString(key.getFirst()));
            for(IntWritable value: values){
                context.write(first, value);
            }
        }
    }
    public static void main(String[] args) throws Exception {
        String INPUT_PATH = "hdfs://192.168.10.111:9000/secondsortzhangguoqiang";
        String OUTPUT_PATH = "hdfs://192.168.10.111:9000/outputsecondsortzhangguoqiang";
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
        if (fileSystem.exists(new Path(OUTPUT_PATH))) {
            fileSystem.delete(new Path(OUTPUT_PATH), true);
        }
        Job job =Job.getInstance(conf, "SecondarySortApp");
        job.setJarByClass(SecondarySortApp.class);
        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setGroupingComparatorClass(GroupingComparator.class);
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

五、hadoop实现

1.生成可执行的jar包
在这里插入图片描述
2.将jar包上传 到/home/admin/file/mapreduce
在这里插入图片描述

3.上传hello.txt,par_1.txt和part_2.txt,sort.txt,second.txt文件到hadoop HDFS
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
3.运行jar包
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

六、出现的问题与解决方案

问题1:在eclipse或idea中终端打开失败无法导出WordCountjar包
解决:在cmd中进入该项目文件夹,输入命令,导出的jar包在target目录下
问题2:partitionerjar包运行时报错java.lang.ArrayIndexOutOfBoundsException: 1
解决:这是因为在txt文件中使用了空格隔开数据,代码中使用的是tab分隔,将txt中的空格换为tab即可(数据之间只能用一个tab隔开)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

编程实现MapReduce操作 的相关文章

  • SDN控制器Ryu、Floodlight、OpenDayLight的安装以及Mininet连接

    文章中文件名内的xxx需要替换成自己文件的具体版本 ubuntu下安装之前可以先用 sudo apt cache madison soft name查看一下apt安装的版本 xff0c 如果版本合适的话用apt更加方便 Ryu控制器 Ryu
  • 调试时出现:undefined Expecting 'EOF','}',',',']', got STRING以下错误的解决方法

    网上查了很多跟此问题相关的答案 xff0c 都没彻底解决 xff0c 今天亲自遇到这个问题和解决方法了 xff0c 特写下来 问题描述 xff1a 代码是这样的 xff1a VM523 1 undefined Expecting EOF g
  • tensorflow详细安装过程

    我电脑安装的python是3 7 4的 xff0c 所以python如果版本不一样的话 xff08 不是3 7的 xff09 xff0c 下边的内容不建议完全参考 xff0c 可以适当参考 主要是注意很多numpy和models与你安装的t
  • FOC——无刷电机的简单驱动

    文章目录 一 什么是无刷电机 xff1f 1 长什么样 xff1f 2 怎么工作 xff1f 二 试着让它转起来1 STM32CubeMX配置2 keil Clion代码编写3 结果分析 参考的资料 写这个是为了记录学习过程 xff0c 为
  • C++ 链表(list)使用简述

    目录 1 有关函数的作用 2 测试用例 C 43 43 STL 库的 list 容器是一个双向链表 包含在头文件 lt list gt 中 1 有关函数的作用 list 本身 xff1a list lt type gt li 定义一个参数类
  • KEIL5打开KEIL4工程的方法

    解决的问题 xff1a 当使用KEIL5打开KEIL4工程的时候会提示让你下载支持包 xff0c 可以参考以下流程安装你的KEI5版本对应的支持包 步骤 xff1a 一 打开KEIL5 xff0c 点击左上角的HELP About uVis
  • Ubuntu shell脚本自动输入密码

    Ubuntu脚本实现自动输入密码 执行shell脚本的时候若遇到权限问题 xff0c 会需要手动输入密码 xff0c 自动化脚本就变得加个引号了 解决方法 描述太麻烦 xff0c 举例说明 xff1a 想要获取权限删除文件 密码为00000
  • H3C华三链路聚合的原理及配置

    1 链路聚合的作用 xff1a 将多条物理链路捆绑在一起形成一条以太网逻辑链路 xff0c 实现增加链路带宽 的目的 xff0c 同时这些捆绑在一起的链路通过相互动态备份 xff0c 可以有效地提高链路的可靠性 2 聚合模式 xff1a 静
  • 【算法】电机-几种直流无刷电机控制优化算法

    除了电机控制里经常使用的经典PID控制方法 xff0c 目前还存在几种常用的优化控制算法在这里给大家普及一下 xff0c 明白它们的大概原理及相互之间的区别 目录 xff1a 1 经典PID控制 2 模糊控制 3 滑膜变结构控制 1 经典P
  • 记一次 jenkins 构建失败 “Cannot find module ‘core-js/modules/es.promise.finally‘”

    目录 前言排查过程解决方案总结 前言 这是一次前端项目构建失败的惨案 xff0c 项目已经部署很久了 xff0c 一直相安无事 因为开发更新了代码 xff0c 在构建的时候报错 xff1a main js Cannot find modul
  • VSCode主题颜色的更改,让字体变暗一些,不那么刺眼(类IDEA风)

    VSCode默认主题就是Dark 43 直接打开settings json文件更改 1 工作区界面的颜色更改 xff0c 主要是背景色2 代码颜色3 总的代码 1 工作区界面的颜色更改 xff0c 主要是背景色 在这一部分 xff0c 我主
  • Error committing transaction. Cause: org.apache.ibatis.executor.ExecutorException: Cannot commit, t

    一 出现这个问题是因为在你的事务提交的时候 关闭sqlSession会话已经执行了 导致会话无法提交 解决方法就是先提交事务 再关闭流操作
  • 考研复试——C、C++

    文章目录 C语言 C 43 43 1 C和C 43 43 的区别 xff1f 2 封装 继承 多态分别是什么意思 xff1f 3 new delete malloc free的关系 xff1f 4 什么是引用 xff0c 引用和指针的区别
  • 运行ROS程序与CMakeList文件

    一 图概念概述 Nodes 节点 一个节点即为一个可执行文件 xff0c 它可以通过ROS与其它节点进行通信 Messages 消息 xff0c 消息是一种ROS数据类型 xff0c 用于订阅或发布到一个话题 Topics 话题 节点可以发
  • OpenvSLAM编译与安装

    OpenvSLAM编译与安装 1 安装依赖2 安装Eigen3 安装Opencv 参考另一篇安装opencv3 4 9 4 安装自定义DBoW25 安装g2o6 安装PangolinViewer7 编译openvSLAM8 检查是否编译成功
  • torch.triu 与 numpy.triu 函数

    triu 61 tri angle u p xff08 我猜的 xff09 顾名思义 xff0c 这个函数的作用相同 xff0c 都是返回上三角矩阵 xff0c 定义分别如下 xff1a numpy triu m k torch triu
  • OpenVSLAM-全局优化模块(global optimization module)

    开源SLAM框架学习 OpenVSLAM源码解析 xff1a 全局优化模块 xff08 global optimization module xff09 xff1a 回环检测 pse graph优化 global BA优化 这篇博客主要介绍
  • Ubuntu20.04系统安装与基本配置

    Ubuntu20 04安装与配置 一 ubuntu20 04 5 系统重装 xff08 联想y7000p xff09 步骤一 xff1a 在 WIN10系统下创建空白磁盘分区步骤二 xff1a 镜像文件写入U盘步骤三 xff1a U 盘安装
  • 【向日葵】连接linux版向日葵出现瞬间断开的情况

    向日葵 连接linux版向日葵出现瞬间断开的情况 问题描述 xff1a 连接到Linux时就会在连接完成的瞬间出现连接已断开 xff0c 我的Linux发行版是Ubuntu18 04 解决 xff1a 这个问题出现的原因是向日葵不支持Ubu
  • 51单片机数码管显示数字及小数点

    51单片机数码管显示 共阴极 1 先看一下显示的结果 源代码 span class token macro property span class token directive hash span span class token dir

随机推荐

  • 数据结构之循环队列基本操作(c语言)

    队列 xff1a 队列是一种先进先出 First In First Out 的线性表 它只允许在表的一端进行插入 xff0c 在另一端删除元素 允许插入的一端成为队尾 xff0c 允许删除的一端成为队头 循环队列的顺序表示和实现 xff1a
  • 数据结构——先序遍历的顺序创建二叉链表并中序遍历(C语言)

    先序遍历的顺序创建二叉链表并中序遍历 1 算法步骤 xff1a 1 xff09 扫描数字序列 xff0c 读入数字n 2 xff09 如果n是一个 0 数字 xff0c 则表明该二叉树为空树 xff0c 即T 61 NULL 否则执行一下操
  • 51单片机的系统扩展之8255A

    8255 xff1a 8255芯片是Intel公司生产的可编程并行I O接口芯片 xff0c 有3个8位并行I O口 具有3个通道3种工作方式的可编程并行接口芯片 xff08 40引脚 xff09 其各口功能可由软件选择 xff0c 使用灵
  • ESP8266一直闪蓝灯,不停复位的解决办法

    问题 xff1a 在一次下载中无意间将下载的文件选错 xff0c 再次下载完成后就突然一直闪蓝灯 xff0c 不停复位 这并不是ESP8266模组坏了 解决办法 xff1a 1 我们平常下载程序选择eagle flash bin和eagle
  • Markdown快速入门

    Markdown快速入门 1 代码块 96 96 96 c include lt iostream gt using namespace std int main cout lt lt 34 hello world 34 lt lt end
  • VSCode 如何让去掉 Pylint 展示的花里胡哨的警告

    Pylint 是一个 python 的语法检测器 xff0c 提升编程效率的同时其带来的花里胡哨的警告也是真让人看着难受 xff0c 就像下面这花花绿绿的波浪线 xff1a 这些警告种类极其丰富 xff0c 比如下面这样 xff1a Met
  • 01_HC-SR04超声波传感器(GPIO中断+定时器方式)

    1 简介 xff1a HC SR04 超声波测距模块可提供 2cm 400cm 的非接触式距离感测功能 xff0c 测 距精度可达高 3mm xff1b 模块包括超声波发射器 接收器与控制电路 2 工作原理 xff1a 1 采用 IO 口
  • Faster R-CNN论文解读

    文章目录 AbstractIntroduction缘由RPN训练方案 Faster R CNN整体流程Conv layersRPNclsreganchorTranslation Invariant AnchorsMuti Scale Anc
  • c语言输入字符时控制符%c前加空格的原因解释

    文章目录 一 前景知识1 缓冲区2 标准输入流 二 scanf语句的执行1 scanf对于整形 d的输入2 scanf对于字符 c的输入 在编一个代码时偶然间发现一个知识盲点 用scanf语句输入字符时需要在控制符 c前加空格 在解释相关这
  • 解决c++中头文件重复包含的问题

    前言 c 43 43 项目中经常会使用到自己定义的一些函数和接口 xff0c 我们通常在头文件中包含进来 xff0c 但这样存在头文件被多次包含的危险 xff0c 导致编译报错 xff0c 以下介绍了几种常用的解决方法 一 采用宏定义的方法
  • 华为交换机5700 SSH配置

    一 在本地设备服务端生成密匙 Huawei rsa local span class token operator span span class token keyword key span span class token operat
  • 函数模板、类模板

    泛型编程 泛型编程 xff1a 编写与类型无关的通用代码 xff0c 是代码复用的一种手段 模板是泛型编程的基础 函数模板 函数模板代表了一个函数家族 xff0c 该函数模板与类型无关 xff0c 在使用时被参数化 xff0c 根据实参类型
  • STM32 第4讲 STM32原理图

    本文为学习正点原子得笔记 xff0c 主要讲解STM32原理图绘制 xff0c 主要由最小系统 43 IO口分布两步完成 引脚分布 STM引脚分类 xff1a 电源引脚晶振引脚复位引脚下载引脚 xff1a JTAG SWD 串口 JTAG
  • STM32 第12讲 GPIO:结构/8种工作模式/寄存器/驱动模型/配置步骤/实验

    文章目录 GPIO简介GPIO特点电气特性GPIO引脚分布 GPIO8种工作模式GPIO的基本结构8种工作模式 GPIO寄存器GPIO端口模式寄存器 xff08 GPIOx MODER xff09 GPIO端口输出类型寄存器 xff08 G
  • PID/LQR/MPC自行总结使用

    PID LQR MPC自行总结使用 自学控制相关知识 xff0c 已经一年多了 xff0c 现在回头看看还是有很多模糊不明确的地方 xff0c 准备借此机会进行总结一下 xff0c 第一次写博客 xff0c 如果错误和不合理之处 xff0c
  • 对 torch.nn.Linear 的理解

    torch nn Linear 是 pytorch 的线性变换层 xff0c 定义如下 xff1a Linear in features int out features int bias bool 61 True device Any N
  • rosdep init 错误解决终极方法(药到病除)

    rosdep init 错误解决方法 一 安装ROS执行以下指令时报错二 原因三 解决办法1 查询IP地址2 将IP地址添加进文件3 重新执行指令 成功解决 xff01 xff01 xff01 一 安装ROS执行以下指令时报错 sudo r
  • Intel Realsense T265 在ubuntu下的环境配置

    Intel Realsense T265 在ubuntu下的环境配置 一 T265介绍二 realsense SDK 安装配置1 注册服务器的公钥2 将服务器添加到存储库列表3 安装所需的库 xff0c 开发者和调试包5 插上T265打开
  • SLAM图优化一

    前言 SLAM问题的处理方法主要分为滤波和图优化两类 滤波的方法中常见的是扩展卡尔曼滤波 粒子滤波 信息滤波等 xff0c 熟悉滤波思想的同学应该容易知道这类SLAM问题是递增的 实时的处理数据并矫正机器人位姿 比如基于粒子滤波的SLAM的
  • 编程实现MapReduce操作

    文章目录 一 MapReduce的WordCount应用二 Partitioner 操作三 xff0e 排序实现四 xff0e 二次排序实现五 hadoop实现六 出现的问题与解决方案 提示 xff1a 以下是本篇文章正文内容 xff0c