假设有订单表t_order和t_product两张数据库表,现在需要进行关联查询。这样的sql语句很容易写
select a.id,a.date,b.name,b.category_id,b.price
from t_order a left out join t_product b
on a.pid = b.id
那么怎么样用mapreduce来实现呢?通过将关联的条件作为map输出的key(pid,商品id,订单表与商品表是多对一关系),将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联,最后写到一个文件中。
我们可以自定义一个Bean,里面封装了两张表的所有字段信息,最后写出到文件的时候将bean输出即可。
public class InfoBean implements Writable{
private String oid;
private String date;
private String pid;
private int amount;
private String pname;
private int category_id;
private int price;
private int flag;
public void write(DataOutput out) throws IOException {
out.writeUTF(oid);
out.writeUTF(date);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeInt(category_id);
out.writeInt(price);
out.writeInt(flag);
}
public void readFields(DataInput in) throws IOException {
this.oid=in.readUTF();
this.date=in.readUTF();
this.pid=in.readUTF();
this.amount=in.readInt();
this.pname=in.readUTF();
this.category_id=in.readInt();
this.price=in.readInt();
this.flag=in.readInt();
}
public void setInfoBean(String oid, String date, String pid, int amount, String pname, int category_id, int price,
int flag) {
this.oid = oid;
this.date = date;
this.pid = pid;
this.amount = amount;
this.pname = pname;
this.category_id = category_id;
this.price = price;
this.flag = flag;
}
@Override
public String toString() {
return "oid=" + oid + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" + pname
+ ", category_id=" + category_id + ", price=" + price ;
}
public InfoBean() {
}
get、set
}
public class MapReduceJoin {
static class MapReduceJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
InfoBean bean = new InfoBean();
Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
FileSplit inputSplit = (FileSplit)context.getInputSplit();
String name = inputSplit.getPath().getName();
String pid = "";
if(name.startsWith("order")){
String[] fields = line.split(",");
pid = fields[2];
bean.setInfoBean(fields[0], fields[1], pid,
Integer.parseInt(fields[3]), "", 0, 0, 0);
}else {
String[] fields = line.split(",");
pid = fields[0];
bean.setInfoBean("", "", pid,
0, fields[1], Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), 1);
}
text.set(pid);
context.write(text, bean);
}
}
static class MapReduceJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{
@Override
protected void reduce(Text key, Iterable<InfoBean> beans,
Context context) throws IOException, InterruptedException {
InfoBean ProductBean = new InfoBean();
List<InfoBean> orderList = new ArrayList<InfoBean>();
for (InfoBean bean : beans) {
int flag = bean.getFlag();
if(flag == 1){
try {
BeanUtils.copyProperties(ProductBean, bean);
} catch (Exception e) {
e.printStackTrace();
}
}else {
InfoBean orderBean = new InfoBean();
try {
BeanUtils.copyProperties(orderBean, bean);
orderList.add(orderBean);
} catch (Exception e) {
e.printStackTrace();
}
}
}
for (InfoBean orderBean : orderList) {
orderBean.setPname(ProductBean.getPname());
orderBean.setCategory_id(ProductBean.getCategory_id());
orderBean.setPrice(ProductBean.getPrice());
context.write(orderBean, NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapReduceJoin.class);
job.setMapperClass(MapReduceJoinMapper.class);
job.setReducerClass(MapReduceJoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
job.setOutputKeyClass(InfoBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
测试:将工程打包上传到linux
创建输入目录
创建订单文件和商品文件编辑字段信息
将文件传到输入目录
执行程序
查看生产文件的内容
[root@mini1 ~]
[root@mini1 ~]
1001,20170710,P0001,1
1002,20170710,P0001,3
1003,20170710,P0002,3
1003,20170710,P0002,4
[root@mini1 ~]
P0001,xiaomi4,1000,2
P0002,iphone6s,1000,3
[root@mini1 ~]
[root@mini1 ~]
[root@mini1 ~]
oid=1002, date=20170710, pid=P0001, amount=3, pname=xiaomi4, category_id=1000, price=2
oid=1001, date=20170710, pid=P0001, amount=1, pname=xiaomi4, category_id=1000, price=2
oid=1003, date=20170710, pid=P0002, amount=4, pname=iphone6s, category_id=1000, price=3
oid=1003, date=20170710, pid=P0002, amount=3, pname=iphone6s, category_id=1000, price=3
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)