1. MR实例开发整体流程
最简单的MapReduce应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。main 函数将作业控制和文件输入/输出结合起来。
2. 环境准备
请参见之前小节的Hadoop集群搭建、windows上部署hadoop包、HDFS API开发等文档;
eclipse JavaSE 开发环境,hadoop-eclipse-plugin插件准备;
Hadoop jar 包准备:
找到windows上部署的hadoop根目录(hadoop开发包下载),需要jar包:
Hadoop-2.6.5\share\hadoop\hdfs\hadoop-hdfs-2.6.5.jar
hadoop-2.6.5\share\hadoop\hdfs\lib\所有jar包
hadoop-2.6.5\share\hadoop\common\hadoop-common-2.6.5.jar
hadoop-2.6.5\share\hadoop\common\lib\所有jar包
hadoop-2.6.5\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.6.5.jar之外的jar包
hadoop-2.6.5\share\hadoop\mapreduce\lib\所有jar包
将上述jar包新建user library导入eclipse中。(另:hadoop源码包下载,可深入研究hadoop框架底层实现。)
开启Zookeeper集群、Hadoop HA集群、MapReduce集群。
3. 实例开发
3.1 实例介绍
统计每一个单词在整个数据集中出现的总次数(wordcount)。
3.1 数据流程
word.txt_split01
map_task01
word.txt_split02
map_task02
word.txt_split..._
map_task..._
Shuffle
reduce_task01,把相同key的map聚合到一起,并遍历累加
reduce_task02
reduce_task...
3.2 上传数据word.txt
MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster
3.3 代码编写
3.3.1 mapper类实现
public class Mapwc extends Mapper < LongWritable, Text, Text, IntWritable> {
@Override
protected void map ( LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value. toString ( ) ;
StringTokenizer words = new StringTokenizer ( line) ;
while ( words. hasMoreTokens ( ) ) {
context. write ( new Text ( words. nextToken ( ) ) , new IntWritable ( 1 ) ) ;
}
}
}
3.3.2 reducer类实现
public class Reducewc extends Reducer < Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce ( Text words, Iterable< IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0 ;
for ( IntWritable s : values) {
sum+= s. get ( ) ;
}
context. write ( words, new IntWritable ( sum) ) ;
}
}
3.3.3 job提交客户端实现
public class Jobwc {
public static void main ( String[ ] args) throws IOException {
Configuration configuration = new Configuration ( ) ;
configuration. set ( "fs.defaultFS" , "hdfs://node01:8020" ) ;
configuration. set ( "yarn.resourcemanager.hostname" , "node02:8088" ) ;
Job job = Job. getInstance ( configuration) ;
job. setJarByClass ( Jobwc. class ) ;
job. setJobName ( "wc" ) ;
job. setMapperClass ( Mapwc. class ) ;
job. setReducerClass ( Reducewc. class ) ;
job. setMapOutputKeyClass ( Text. class ) ;
job. setMapOutputValueClass ( IntWritable. class ) ;
FileInputFormat. addInputPaths ( job, "/wc/input/word.txt" ) ;
Path path = new Path ( "/wc/output" ) ;
FileSystem fs = FileSystem. get ( configuration) ;
if ( fs. exists ( path) ) {
fs. delete ( path, true ) ;
}
FileOutputFormat. setOutputPath ( job, path) ;
boolean f;
try {
f = job. waitForCompletion ( true ) ;
if ( f) {
System. out. println ( "job success !" ) ;
} else {
System. out. println ( "-------------" ) ;
}
} catch ( ClassNotFoundException e) {
e. printStackTrace ( ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
}
}
3.3.4 运行结果
MapReduce 1
a 3
algorithm 1
an 1
and 2
associated 1
big 1
cluster 1
data 1
distributed 1
for 1
generating 1
implementation 1
is 1
model 1
on 1
parallel, 1
processing 1
programming 1
sets 1
with 1
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)