编写一个简单的WordCount程序
Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable v = new LongWritable(1);
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
Arrays.asList(words).forEach(word -> {
k.set(word);
try {
context.write(k,v);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
Reducer
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Arrays;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
LongWritable v = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key,v);
}
}
Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(""));
FileOutputFormat.setOutputPath(job, new Path(""));
boolean ret = job.waitForCompletion(true);
System.exit(ret ? 0 : 1);
}
}
job.waitForCompletion(true);
submit();
connect();
new Cluster(getConfiguration())
initialize(jobTrackAddr, conf);
clientProtocol = provider.create(conf);(LocalJobRunner、YARNRunner)
submitter.submitJobInternal(Job.this, cluster);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
JobID jobId = submitClient.getNewJobID();
copyAndConfigureFiles(job, submitJobDir);
int maps = writeSplits(job, submitJobDir);
writeConf(conf, submitJobFile);
主要步骤代码跟踪:
1、
2、连接集群
2.1:new 了一个Cluster对象
2.2:初始化
2.3创建了一个clientProtocol对象
2.4:ClientProtocolProvider有;两种模式client和yarn
3.提交job
3.1:获取一个stage目录,这个目录yarn模式下实在hdfs上创建的
3.1.1
3.1.2
3.1.3:创建的stage目录如下(此次local模式演示,所以stage是在本地,yarn模式下实在hdfs上创建,目录)
3.2:获取jobId
3.2.1:jobId
3.3:copy一些文件和jar包到stage目录下
3.4:计算切片,生成切片规划文件
3.4.1:
3.4.2
3.4.3:从这里可以看出可以自定义inputformat带自定义生成切片的规则(默认FileintputFormat)
3.5:写文件到stage目录下
writeConf方法执行完成之后的stage目录