目录
应用场景
什么是二次排序
怎样实现二次排序
示例代码
应用场景
假如输入文件内容如下:
a,1
z,3
b,2
a,100
a,3
b,1
要求经过MapReduce处理后,key升序排列,相同key的vaule也升序排列,如下:
a,1
a,3,
a,100
b,1
b,2
z,3
什么是二次排序
二次排序是指我们对key进行排序后,同时也需要对value中的某个字段进行排序。实现二次排序的关键在于将初始的key与待排序字段组合成自定义类型的数据类型,将其作为新的key,利用mapreduce自动对key进行排序的原理,完成二次排序。在上面的示例输入中,初始key为字母列,待排序字段为数字列。
怎样实现二次排序
由初始key字段与待排序字段组成的自定义的数据类型需要实现WritableComparable接口,WritableComparable接口继承自Writable接口和Comparable接口。Writable接口主要是用来实现序列化和反序列化,Comparable是Java中用于比较的接口。
在自定义的数据类型中,需要将初始key和待排序字段分别定义为变量,添加构造方法和get()/set()方法,同时重写序列化和反序列的方法,注意方法中数据类型要一致,最后重写比较方法。
将上面自定义好的数据类型作为map输出的key,value还是初始的value,并输出。示例内容中,经过此步处理后,Map输出内容为
((a,1),1)
((z,3),3)
((b,2),2)
((a,100),100)
((a,3),3)
((b,1),1)
Map输出之后在reduce之前,还需要对map端输出的值进行处理,如下:
1,分区(partition)。因为使用了组合key作为新的key,如果还用之前的默认分区方法,在存在多个reduce 时,会将数据分散开,不符合要求,所以要 需要使用自定义的分区,然后按照初始的key进行区分,这样才能使结果符合要求。
自定义分区类需要继承Partitioner类,重写getPartition方法;在Job中通过setPartitionerClass设置使用自定义的分区类。
2,分组(group),需要使用自定义group来处理我们需要的key,按照组合key中第一个字段,即初始key进行分组,这样得到的数据就是有序而且全部的.
自定义分组类需要实现原生的RawComparator接口,RawComparator是一个原生的优化接口类,它只是简单的提供了数据流中的简单数据比较方法,此接口并没有被多数的衍生类所实现,最常用的实现类为WritableComparator,多数情况下是作为实现Writable接口的类的内部类,提供序列化字节的比较。RawComparator有两个比较方法,一个是对象间的比较,一个是字节数组的比较。
示例代码
(1) 创建map类,使用自定义数据类型作为输出结果的key,并实现map方法 ,设置组合key的值写入到context中
public static class SecondarySortMapper extends
Mapper<LongWritable,Text,PairWritable,IntWritable>{}
/**......省略....**/
String lineValue = value.toString();
String[] strs = lineValue.split(",") ;
PairWritable mapOutputKey = new PairWritable ();
mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
mapOutputValue.set(Integer.valueOf(strs[1]));
/**.....省略.....**/
context.write(mapOutputKey, mapOutputValue);
(2) 创建reduce类,使用自定义数据类型作为输入的key,在reduce方法中,将输入key的第一个字段值,即初始key作为输出结果的key,循环输入的列表,将完成排序的key/value输出到上下文中。
/**部分示例代码**/
public static class SecondarySortReducer extends
Reducer<PairWritable,IntWritable,Text,IntWritable>{}
PairWritable outputKey = new outputKey();
outputKey.set(key.getFirst());
for(IntWritable value : values){
context.write(outputKey, value);
}
(3) 设置job类,注意map输出类型和reduce类型均为自定义数据类型
/**部分示例代码**/
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
job.setMapperClass(SecondarySortMapper.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
/**设置分区类**/
job.setPartitionerClass(FirstPartitioner.class);
/**设置分组类**/
job.setGroupingComparatorClass(FirstGroupingComparator.class);
(4) 实现自定义数据类型PairWritable,实现WritableComparable接口,
public class PairWritable implements WritableComparable<PairWritable> {
private String first;
private int second;
public PairWritable() { }
public PairWritable(String first, int second) {
this.set(first, second);
}
public void set(String first, int second) {
this.setFirst(first);
this.setSecond(second);
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
/**Get和set方法都用到了Integer的最大值,这是一种保持数据同为正数或同为负数的常用方法,避免出现正数和负数进行比较的情况**/
public int getSecond() {
return second - Integer.MAX_VALUE;
}
public void setSecond(int second) {
this.second = second + Integer.MAX_VALUE;
}
/**序列化和反序列化的方法,注意数据类型要前后对应**/
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
/**比较对象的大小,返回结果为int类型,先对第一个字段进行比较,如果相同,继续比较第二个字段**/
public int compareTo(PairWritable o) {
int comp =this.first.compareTo(o.getFirst()) ;
/**如果不相等**/
if(0 != comp){
return comp ;
}
/**相等**/
return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;
}
}
(5) 自定义分组类FirstPartitioner,如果第一个字段相同则分为一组
/**部分示例代码**/
public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value,
int numPartitions) {
/**使用哈希码进行分组**/
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
(6) 自定义分组类FirstGroupingComparator,如果第一个字段相同则为一组
/**部分示例代码**/
public class FirstGroupingComparator implements RawComparator<PairWritable> {
/**比较对象值,我们需要的是对组合key中的第一个字段进行比较**/
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
/**比较字节数组,因为我们输出类型为int,占4个字节,所以用数组总长度l减去4,即为需要的字节长度**/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)