/**
* hadoop经典入门wordcount 主要有三大步 1.编写mapper函数 2.编写reducer函数 3.配置
*/
public class WordCount {
/**
* mapper类
*
* 这些泛型继承自hadoop自定义的序列化框架Writable
* Hadoop使用自己的序列化框架以减少集群间,网络流量提高性能 也可以在这里使用avro的序列化框架
* avro的好处是语言无关,数据人类可读
*/
public static class SplitWordMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/*
* map函数会多次调用
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// StringTokenizer 是按照" "\t\n\r\f这5种情况区分单词的
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// 将映射结果存入上下文对象中
context.write(word, one);
}
}
}
/**
* 一般情况下mapper的输出类型就是reducer的输入类型
*
*/
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
// 统计key相同的单词个数
for (IntWritable val : values) {
// 这个val.get()的返回值就是map中的one所包装的1
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
// 指定jar包类
job.setJarByClass(WordCount.class);
// 指定map类
job.setMapperClass(SplitWordMapper.class);
// 指定combiner类(这个类是指在单个节点完成mapper任务后),是否在mapper的输出端直接进行结果聚合,达到较少集群间网络流量的目的
// 该类的调用次数不确定,可能0,1,2...n次调用,但他不论调用多少次,都不会对最终结果造成影响
// 但是该结果会对mapper的输出结果产生影响,他可以减少shuffle的次数,
// 在本例中可以直接使用reducer函数,在求平均值的情况下,不可以使用
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 指定输出kv对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 这里没有指定输出文件的类型,默认使用TextOutputFormat
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
打成jar包
放到hadoop集群运行
jar包mainclass
Hadoop运行命令
hadoop jar word.jar input/3.txt output
输出目录:
结果:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)