前面的实例都是在数据上进行一些简单的处理,为进一步的操作打基础。单表关联这个实例要求从给出的数据中寻找到所关心的数据,它是对原始数据所包含信息的挖掘。下面进入这个实例。
分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。连接结果中除去连接的两列就是所需要的结果----grandchild,grandparent表。要用MapReduce实现这个实例,首先要考虑如何实现表的自连接,其次就是连接列的设置,最后是结果的整理。考虑到MapReduce的shuffle过程会将相同的key值放在一起,所以可以将Map结果的key值设置成待连接的列,然后列中相同的值自然就会连接在一起了。再与最开始的分析联系起来:要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在Map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,作为左表;再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表信息,比如在value的String最开始处加上字符1表示左表,字符2表示右表。这样在Map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。在Reduce接收到的连接结果中,每个key的value-list就包含了grandchild和grandparent关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class STjoin {
public static int time = 0;
// Map 将输入分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要
// 注意的是在输出的value中必须加上左右表区别标志
public static class Map extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value,Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// super.map(key, value, context);
String childname = new String();
String parentname = new String();
String relationtype = new String();
String line = value.toString();
int i=0;
while(line.charAt(i)!=' '){
i++;
}
String [] values = {line.substring(0,i),line.substring(i+1)};
if(values[0].compareTo("child") !=0 ){
childname = values[0];
parentname = values[1];
relationtype = "1"; // 左右表区分标志
context.write(new Text(values[1]), new Text(relationtype+"+"+childname+"+"+parentname)); // 左表
relationtype = "2";
context.write(new Text(values[0]), new Text(relationtype+"+"+childname+"+"+parentname)); // 右表
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// super.reduce(arg0, arg1, arg2);
if(time==0){ // 输出表头
context.write(new Text("grandchild"), new Text("grandparent"));
time++;
}
int grandchildnum = 0;
String grandchild[] = new String[10];
int grandparentnum = 0;
String grandparent[] = new String[10];
Iterator ite = values.iterator();
while(ite.hasNext()){
String record = ite.next().toString();
int len = record.length();
int i = 2;
if(len == 0) continue;
char relationtype = record.charAt(0);
String childname = new String();
String parentname = new String();
// 获取value-list中value的child
while(record.charAt(i)!='+'){
childname = childname + record.charAt(i);
i++;
}
i = i+1;
// 获取value-list中value的parent
while(i<len){
parentname = parentname+record.charAt(i);
i++;
}
// 左表,取出child放入grandchild
if(relationtype == '1'){
grandchild[grandchildnum] = childname;
grandchildnum++;
}else { // 右表,取出parent放入grandparent
grandparent[grandparentnum] = parentname;
grandparentnum++;
}
}
// grandchild 和 grandparent 数组求笛卡尔积
if(grandparentnum !=0 && grandchildnum !=0){
for(int m=0;m<grandchildnum;m++){
for(int n=0; n<grandparentnum;n++){
context.write(new Text(grandchild[m]), new Text(grandparent[n])); // 输出结果
}
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2){
System.out.println("Usage:wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"sing table join");
job.setJarByClass(STjoin.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}