Hadoop Mapreduce编程之Reduce端join实现

2023-11-04

1.数据准备

movies.dat 数据格式: // movieid::moviename::movietype

ratings.dat 数据格式: // userid::movieid::rating::timestamp

2.Mapper端开发
1)定义必要的变量
private String filename="";
IntWritable mk=new IntWritable();
Text mv=new Text();
2)通过重写setup方法获取切片的文件名
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 获取文件切片
    filename = inputSplit.getPath().getName();   //获取文件名
}
3) map方法—根据每个文件的文件名不同来标识不同表
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] lines = value.toString().split("::");
    if(filename.equals("movies.dat")){ // movieid::moviename::movietype
        mk.set(Integer.parseInt(lines[0].trim()));
        mv.set("M"+lines[1]+"\t"+lines[2]);
    }else{// 文件名为ratings.dat
        mk.set(Integer.parseInt(lines[1].trim()));// userid::movieid::rating::timestamp
        mv.set("R"+lines[0]+"\t"+lines[2]+"\t"+lines[3]);
    }
    context.write(mk,mv);
}
3.Reducer端开发
public class RatingReduce extends Reducer<IntWritable, Text,IntWritable,Text> {
    Text mv=new Text();
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        List<String> mlist=new ArrayList<>();
        List<String> rlist=new ArrayList<>();
        for (Text value : values) {
            String info =value.toString();
            if(info.startsWith("M")){
                mlist.add(info.substring(1));
            }else {
                rlist.add(info.substring(1));
            }
        }
        // 开始拼接
        for (String movie : mlist) {
            for (String rating : rlist) {
                String res=movie+"\t"+rating;
                mv.set(res);
                context.write(key,mv);
            }
        }
    }
}
4.Driver端开发
public class RatingDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.setProperty("HADOOP_USER_NAME","hadoop");
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS","hdfs:/mkmg/");
        Job job = Job.getInstance(conf);

        job.setJarByClass(RatingDriver.class);

        job.setMapperClass(RatingMapper.class);
        job.setReducerClass(RatingReduce.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job,new Path("D://movie/ratings.dat"),new Path("D://movie/movies.dat"));
        FileSystem fs=FileSystem.get(conf);
        Path out=new Path("D://movie_out");
        if(fs.exists(out)){
            fs.delete(out,true);
        }
        FileOutputFormat.setOutputPath(job,out);

        job.waitForCompletion(true);
    }
}
5.结论总结
/**
 * reduce端join的缺陷:-----适合大表和大表关联
 *      1)数据倾斜---分区分布不均匀
 *      2)因为reduce端采用的集合,数据量大的时候,可能会产生OOM
 *      3)reducetask本身并行度不高,导致性能比较低----经验值是:DataNode数量*0.95
 */
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop Mapreduce编程之Reduce端join实现 的相关文章

随机推荐