------------本文笔记整理自《Hadoop海量数据处理:技术详解与项目实战》范东来
一、设计思路
HDFS上存放两个文件,一个记录了学生基本信息(姓名,学号),文件名“student_info.txt”,文件内容为:
Jenny 00001 Hardy 00002 Bardley 00003 ... |
另一个文件记录了学生的选课信息(学号,课程名),文件名“student_class_info.txt”,文件内容为:
00001 Chinese 00001 Math 00002 Music 00002 Math 00003 Physic ... |
现在要对这两个文件进行join操作,得到结果为:
Jenny Chinese Jenny Math Hardy Music Hardy Math Bardley Physic ... |
该操作和SQL中的join操作类似,具体实现方式:先在map阶段读入student_info.txt和student_class_info.txt两个文件,并将学号作为输出键,姓名或课程在附加上文件名后作为输出值;再在reduce阶段对map中间结果进行笛卡尔乘积。
二、代码实现
1.Mapper类
package com.hadoop.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/*
* JoinMapper类
* map输入信息:
* 文件1:student_info.txt
* Jenny 00001
* Hardy 00002
* Bradley 00003
* ...
* 文件2:student_class_info.txt
* 00001 Chinese
* 00001 Math
* 00002 Music
* 00002 Math
* 00003 Physic
* ...
* Mapper处理后输出的中间结果:
* 00001 Jenny student_info.txt
* 00001 Chinese student_class_info.txt
* 00001 Math student_class_info.txt
* 00002 Hardy student_info.txt
* 00002 Music student_class_info.txt
* 00002 Math student_class_info.txt
* 00003 Bradley student_info.txt
* 00003 Physic student_class_info.txt
*/
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
//定义文件名称标识
private static final String LEFT_FILENAME = "student_info.txt";
private static final String RIGHT_FILENAME = "student_class_info.txt";
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//从输入分片信息中取得文件路径
//FileSplit 是 抽象类InputSplit 的实现类,记录了文件的具体切片信息。
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
//文件标识
String fileFlag = null;
//输出键(学号)
String outKey = null;
//输出值(姓名 或 课程)
String outValue = null;
//行记录的信息
String[] infos = value.toString().split(" ");
//判断行记录所来自的文件
if (filePath.contains(LEFT_FILENAME)) {
fileFlag = LEFT_FILENAME;
outKey = infos[1];
outValue = infos[0];
}
else if (filePath.contains(RIGHT_FILENAME)) {
fileFlag = RIGHT_FILENAME;
outKey = infos[0];
outValue = infos[1];
}
//输出键值对,并在值上标记文件名
context.write(new Text(outKey), new Text(outValue + "\t" + fileFlag));
}
}
2.Reducer类
package com.hadoop.mr.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* JoinReducer类
* Reducer的输入数据(Mapper的输出中间结果):
* 00001 Jenny student_info.txt
* 00001 Chinese student_class_info.txt
* 00001 Math student_class_info.txt
* 00002 Hardy student_info.txt
* 00002 Music student_class_info.txt
* 00002 Math student_class_info.txt
* 00003 Bradley student_info.txt
* 00003 Physic student_class_info.txt
* ...
* Reducer的输出结果(join结果)
* Jenny Chinese
* Jenny Math
* Hardy Music
* Hardy Math
* Bardley Physic
* ...
*
*/
public class JoinReducer extends Reducer<Text, Text, Text, Text>{
//定义文件名称标识
private static final String LEFT_FILENAME = "student_info.txt";
private static final String RIGHT_FILENAME = "student_class_info.txt";
private static int num = 0;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//计数reduce调用次数,输出key
{
num++;
System.out.println(num + " " +key);
}
//学生姓名
String studentName = null;
//学生课程名数组
List<String> studentClassNames = new ArrayList<String>();
//根据文件名标识信息,将姓名、课程归类
for (Text value : values) {
String[] infos = value.toString().split("\t");
if(LEFT_FILENAME.equals(infos[1])) {
studentName = infos[0];
}
else if (RIGHT_FILENAME.equals(infos[1])){
studentClassNames.add(infos[0]);
}
}
//去除无法建立内连接的信息
if (studentName == null || studentClassNames.size() == 0) {
return;
}
//将姓名-课程 键值对遍历输出
for (int i = 0; i < studentClassNames.size(); i++) {
context.write(new Text(studentName), new Text(studentClassNames.get(i)));
}
}
}
3.驱动类
package com.hadoop.mr.join;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* join的驱动类
*/
public class MR_Join {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
//加载hadoop配置信息
Configuration conf = new Configuration();
//初始化作业信息
Job job = Job.getInstance(conf, "MR_Join");
job.setJarByClass(MR_Join.class);
//设置Mapper/Reducer类型
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
//设置输出键值对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置reduce任务个数(即分区数上限)
//job.setNumReduceTasks(2);
//设置文件输入/输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
三、打包Jar,并运行程序
1.将com.hadoop.mr.join包右键导出为 JAR file,命名为:"MR_Join.jar";
2.利用Windows的cmd或者PowerShell(推荐)将JAR文件上传到Linux服务器
命令如下:(在JAR文件目录下执行)
> scp MR_Join.jar root@remoteIP:~/myJars/mapreduce/
(其中remoteIP为远程服务器IP)
3.启动hadoop
--创建学生信息输入文件
> cd ~/myJars/mapreduce/
> touch student_info.txt
> vi student_info.txt
按键"i",进入编辑模式,向student_info.txt文件中输入内容,如下:
Jenny 00001
Hardy 00002
Bradley 00003
按键"ESC"-->"shift q"-->输入"wq!",回车,保存
--查看文件
> cat student_info.txt
--创建学生选课文件
> touch student_class_info.txt
> vi student_class_info.txt
按键"i",进入编辑模式,向student_class_info.txt文件中输入内容,如下:
00001 Chinese
00001 Math
00002 Music
00002 Math
00003 Physic
按键"ESC"-->"shift q"-->输入"wq!",回车,保存
--查看文件
> cat student_class_info.txt
--在HDFS中创建输入文件目录
> hadoop fs -mkdir /user/hadoop/joininput
--在HDFS中查看输入文件目录
> hadoop fs -ls /user/hadoop/joininput
--将本地的两个文件拷贝到HDFS的输入目录中(在"~/myJars/mapreduce/"下执行)
--可以多个文件一起传输
> hadoop fs -copyFromLocal student_info.txt student_class_info.txt /user/hadoop/joininput/
4.执行JAR,运行程序
命令如下:(在JAR文件目录"~/myJars/mapreduce/"下执行)
> hadoop jar MR_Join.jar com.hadoop.mr.join.MR_Join /user/hadoop/joininput /user/hadoop/joinoutput
运行过程中,屏幕会输出执行过程,直到完成
5.查看单词统计结果
成功执行完后,目录"/user/hadoop/joinoutput/"下会产生两个文件
/user/hadoop/joinoutput/_SUCCESS --成功执行完的空标识文件
/user/hadoop/joinoutput/part-r-00000 --作业输出结果文件
--查看输出文件
> hadoop fs -cat /user/hadoop/joinoutput/part-r-00000
Jenny Chinese
Jenny Math
Hardy Music
Hardy Math
Bardley Physic
<此即为join结果>
四、结果分析
1.查看日志文件,发现:
MapReduce任务为两个输入文件各创建了一个Mapper类来读入数据。
一共2个map任务(对应2个文件分片),1个reduce任务(默认)。
(因为两个文件都很小,所以一个文件就是一个输入切片)
注:reduce任务可通过job.setNumReduceTasks(2);来设置。
2.如果在MR_Join类中增加代码 job.setNumReduceTasks(2);,将reduce任务设置成2个(默认1个),
那么,会发现,输出结果文件变成了两个part-r-00000和part-r-00001
文件part-r-00000中:(分区一)
Jenny Math
Jenny Chinese
Bardley Physic
文件part-r-00001中:(分区二)
Hardy Math
Hardy Music
可见 1.reduce任务个数是分区个数的上限
(因为求解分区时hadoop默认对key进行hash分区,对reduce个数求余)
2.一个reduce任务对应一个输出文件
3.通过在JoinReducer类中增加reduce方法调用次数的计数输出代码
(注:程序的标准输出和错误输出不会在控制台上打印,会分别输出到对应任务的日志文件stdout和stderr中)
并在下列两个日志(最后两个日志是reduce任务日志)中
http://master:50070/logs/userlogs/application_id_0005/container_id_0005_01_000004/stdout
输出:
1 00001
2 00003
http://master:50070/logs/userlogs/application_id_0005/container_id_0005_01_000005/stdout
输出:
1 00002
可见:同一个分区中是包含不同key的记录的,Reducer任务会为每一个key调用一次reduce方法。
五、拓展链接(不同实现方式)
1.MapReduce:实现join的几种方法
2.MapReduce实现join操作
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)