二次排序的需求说明
在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。
1.png
测试的文件数据
a 5
a 1
a 7
a 9
b 3
b 8
b 10
c 2
c 4
c 3
第一种实现思路
直接在reduce端对分组后的values进行排序。
- map关键代码
public class myMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("before Mapper <k1,v1>:"+key+"-"+value);
String line = value.toString();
String[] str = line.split(" ");
context.write(new Text(str[0]),new IntWritable(Integer.parseInt(str[1])));
}
}
- reduce关键代码
-
public class myReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
List<Integer> valuesList = new ArrayList<Integer>();
// 取出value
for(IntWritable value : values) {
valuesList.add(value.get());
}
// 进行排序
Collections.sort(valuesList);
for(Integer value : valuesList) {
context.write(key, new IntWritable(value));
}
}
}
App关键代码
public class myApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
Job job = Job.getInstance(conf);
//设置job的各种属性
job.setJobName("myAppApp"); //作业名称
job.setJarByClass(myApp.class); //搜索类
//添加输入路径
FileInputFormat.addInputPath(job,new Path("F:\\mr\\secondsort\\input1"));
//设置输出路径
FileOutputFormat.setOutputPath(job,new Path("F:\\mr\\secondsort\\output1"));
job.setMapperClass(myMapper.class); //mapper类
job.setReducerClass(myReducer.class); //reducer类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); //
job.waitForCompletion(true);
}
}
其输出:
a 1
a 5
a 7
a 9
b 3
b 8
b 10
c 2
c 3
c 4
需要注意的是,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。还有就是,在reduce端对values进行迭代的时候,不要直接直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。
第二种实现思路
用二次排序来实现
二次排序,从字面上可以理解为在对key排序的基础上对key所对应的值value排序,也叫辅助排序。一般情况下,MapReduce框架只对key排序,而不对key所对应的值排序,因此value的排序经常是不固定的。但是我们经常会遇到同时对key和value排序的需求,例如Hadoop权威指南中的求一年的高高气温,key为年份,value为最高气温,年份按照降序排列,气温按照降序排列。还有水果电商网站经常会有按天统计水果销售排行榜的需求等等,这些都是需要对key和value同时进行排序。
根据相关博客加上书本的理解:
得到如下的流程:
input file ---> split ---> recordreader(生成键值对)---> 形成复合键ComboKey(key和value的复合键) ---> 分区 (setPartitionerClass设置分区函数--可以是自定义的分区函数,这里主要根据的是组合键的第一个字段(key)进行分区) ----> 在每个分区内进行排序(分区内部排序,setSortComparatorClass设定特定的排序对比器进行排序,实际上这已经算是进行了一次二次排序) ---> shuffle阶段--->内部排序(shuffle阶段从不同节点读取了相应的map输出文件,所以在此处进行第二次排序,同样的setSortComparatorClass设定特定的排序对比器进行排序,进行第二次排序)--->reduce阶段进行分组(会对键值相同的项进行分组操作,其默认操作的键。对于我们生产的键值对<key1,value1>,key1是一个复合键值对,我们对他的操作是针对复合键值对key1的第一个值为准的,setGroupingComparatorClass设定的分组函数)---> 执行reduce函数--->输出结果。
具体事例:
事例文件如上:
posted @
2019-02-21 22:06
流氓小伙子 阅读(
...) 评论(
...)
编辑
收藏
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)