1. MapReduce作业处理过程概述
当用户使用Hadoop的Mapreduce计算模型来进行处理问题时,用户只需要定义所需的Mapper和Reduce处理函数,还有可能包括的Combiner、Comparator、Partition等函数;之后,新建一个Job对象,并Job的运行环境进行相应的配置,最后调用Job的waitForCompletion或者submit方法提交作业。具体代码结构如下:
Configuration conf = new Configuration(true);
conf.set("fs.defaultFS", "hdfs://node02:8020");
conf.set("yarn.resourcemanager.hostname", "node02:8088");
Job job = Job.getInstance(conf);
job.setJarByClass(主类名.class);
job.setMapperClass(Mapper实现类.class);
job.setCombinerClass(作为Combiner的Reducer实现类.class);
job.setReducerClass(Reducer实现类.class);
job.setOutputKeyClass(输出key的数据类型.class);
job.setOutputValueClass(输出value的数据类型.class);
Path input = new Path("设置作业的输入路径");
FileInputFormat.addInputPath(job, input);
Path output = new Path("设置作业的输出路径");
FileOutputFormat.setOutputPath(job, output );
job.waitForCompletion(true);
job的waitForCompletion方法内部实际上是依靠JobClient来向JobTracker来提交作业的。当JobTracker接收到JobClient的提交作业的请求后,将会作业加入到作业队列中,之后会返回给JobClient一个用于唯一标识该作业的JobID对象。JobTracker作业队列中的作业任务会由TaskTracker来执行。TaskTracker会定期向JobTracker发送心跳,查询JobTracker是否有作业需要执行。当TaskTracker接收到任务之后,会在本地启动一个Task来执行任务。
2. MapReduce作业的配置信息——JobConf
JobConf所在的包位于org.apache.hadoop.mapred。该类集成自Configuration,它在原有的Hadoop的Configuration基本信息的基础上加入了与MapReduce作业相关的配置信息。Job继承自JobContext类,而JobContext中有个protected final org.apache.hadoop.mapred.JobConf conf的成员变量,所以Job内部会调用JobConf的方法来完成该MapReduce作业的配置,比如Job中setInputFormatClass设置作业输入格式的方法:
public void setInputFormatClass(Class<? extends InputFormat> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
InputFormat.class);
}
2.1 静态代码块
static{
ConfigUtil.loadResources();
}
2.2 成员变量
public static final long DISABLED_MEMORY_LIMIT = -1L;
public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
public static final String DEFAULT_QUEUE_NAME = "default";
static final String MAPREDUCE_JOB_MAP_MEMORY_MB_PROPERTY = JobContext.MAP_MEMORY_MB;
static final String MAPREDUCE_JOB_REDUCE_MEMORY_MB_PROPERTY = JobContext.REDUCE_MEMORY_MB;
public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
.....
2.3 成员方法
该类的大部分方法都是用于设置和获取与该作业相关的配置项的set和get方法,此外还有几个工具方法:
public static String findContainingJar(Class my_class) {
return ClassUtil.findContainingJar(my_class);
}
public class ClassUtil {
public static String findContainingJar(Class<?> clazz) {
ClassLoader loader = clazz.getClassLoader();
String classFile = clazz.getName().replaceAll("\\.", "/") + ".class";
try {
for(final Enumeration<URL> itr = loader.getResources(classFile);
itr.hasMoreElements();) {
final URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
}
3. JobSubmissionProtoco接口
4. RunningJob接口
5. JobStatus、JobProfile、JobSubmissionFiles
5. 提交Job的客户端——JobClient
JobClient是用户作业和JobTracker进行交互的接口。JobClient为用户提供了用户提交作业、跟踪作业的处理进度、访问组成作业的工作日志、取得MapReduce集群的状态信息等方法。
5.1 静态代码块
static{
ConfigUtil.loadResources();
}
5.2 成员变量
5.3 内部类
static class NetworkedJob implements RunningJob {
......
}
5.4 成员方法
public void init(JobConf conf) throws IOException {
......
}
public RunningJob submitJobInternal(final JobConf conf)
throws FileNotFoundException, IOException {
......
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)