前边写了MapReduce
的介绍、以及四大组件、序列化机制和排序。
这一篇记录一下MapReduce
相关的job
机制,对于在代码里,我们总要有一个Driver
,比如下边:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//获取job对象
Job job = Job.getInstance(conf);
//设置job方法入口的驱动类
job.setJarByClass(ProfitDriver1.class);
//设置Mapper组件类
job.setMapperClass(ProfitMapper1.class);
//设置mapper的输出key类型
job.setMapOutputKeyClass(Text.class);
//设置Mappper的输出value类型,注意Text的导包问题
job.setMapOutputValueClass(IntWritable.class);
//设置reduce组件类
job.setReducerClass(ProfitReducer1.class);
//设置reduce输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径
FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/profit"));
//设置输出结果路径,要求结果路径事先不能存在
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/profit/result"));
//提交job
job.waitForCompletion(true)
}
}
当我们在写完组件时,比如Mapper
、Reducer
组件时,都会到这个类里,将它们定义到Job
里,这样任务才能执行起来。那它的工作流程是啥呢?
一、job机制
1、job的介绍
当我们在集群中,将我们写的Driver
和这些组件类,打成jar
包。执行这个 MapReduce
的jar
包时,比如下边的命令:
hadoop jar flow.jar
执行后,大概的步骤如下(hadoop 1.0版本):
- 将该
jar
包临时上传到hdfs
里的tmp
目录下,然后解析jar包,查看环境及命令是否符合规范,符合则访问jobtracker
。
-
jobtracker
访问namenode
获取执行文件的信息,然后对数据进行逻辑切片
(一般该切片大小的设置与实际存储在datanode
上的切块大小一致)。
- 切片之后,会按照切片数量,交给不同的
tasktracker
,tasktracker
按照自己获得的切片,会向datanode
发起rpc
请求。
-
datanode
会将块返回给tasktracker
,创建任务,生成jobid
,并执行,最后删除tmp
的jar
包。
这种即是mapreduce
的job任务简单流程。
2、job任务的要点
-
job
任务在对数据进行逻辑切片时,一般与datanode
的切块大小一致,因为如果两者不一致,势必会有map
任务,需要从两个datanode
上获取完整的数据,当split
大于block
,会引起数据的拷贝,从另一个datanode
找剩余的数据,拷贝到mapreduce
任务所在节点上,然后再执行。
- 因为
namenode
网络中访问量较大,一般jobtracker
和namenode
在一台机器上,减少带宽的影响,避免网络传输。
- 另外,常理上来说,
datanode
和tasktracker
也是会在同一台机器上,但如果两者数量不一致的时候,那对应可能就会处在其中任意一台,数据只能从其他机器上拷贝数据了。(这个我其实没缕明白。可能不对。)
3、Hadoop 1.X 版本的内部执行流程
hadoop1.0 job任务的执行流程图如下:
第一阶段:Run Job
该阶段由客户端来完成,底层有一个jobClient类来做具体实现。该阶段主要完成:
①、做job环境信息的收集,比如各个组件类,以及输出的key、value类型,然后检测是否合法;
②、检测输入的路径是否合法,以及输出结果路径的合法性;
如果以上检查未通过,则直接报错,不会做后续的job提交动作。
第二阶段:Get New Job ID
如果第一阶段run Job
检测通过,jobClient
会向JobTracker
为当前job申请一个jobID,jobID是全局唯一的,用于标识一个job。
第三阶段:Copy Job Resources
这个阶段是JobClient
把job的运算资源上传到HDFS
。路径为:/tmp/hadoop-yarn/staging/UserName/
。
运算资源包括:①、jar包 ②、文件的切片信息 ③、job.xml整个job的环境参数
第四阶段:Submit job
当jobClient
将运算资源上传到HDFS之后,提交Job。
第五阶段:Initialise Job
初始化Job信息
第六阶段:Retrieve Input Splits
获取Job的切片数量
注:第五第六两个阶段的目的在于获取整个Job的MapTask任务数量和ReduceTask的任务数量,MapTask任务数量=切片(Split)数量,ReduceTask
的任务数量为代码中设定,默认为1。
第七阶段:HeartBeat(Return Tasks)
TaskTracker
通过心跳机制向jobTracker
领取任务,要满足数据本地化策略,即优先领取保存在它所在服务器上的那部分数据块。切片和切块有所不同。
切片是一个对象(FileSplit),封装的是一个块的描述信息;切块是文件块,里面存储的是真正的文件,切块是真实的数据。
补充: MR框架虽然在处理任务时满足数据本地化策略,但是,为了确保读取的完整性,也会做行的追溯(因为切块不是根据行切,而是根据大小切,肯定会有数据断开的情况),这个过程肯定会发生网络数据的传输。
第八阶段:Retrieve Job Resources
TaskTracker
去HDFS下载Job的运行资源。体现的思想是:移动的是运算(运算资料),而不是数据,也是尽可能的节省带宽。
第九阶段:Launch
启动JVM进程。
第十阶段:Run
运行MapTask
和ReduceTask
。
二、 job链
1.介绍
有这么一种场景,比如写了一套Mapper
和Reducer
组件,但是完不成我们的最后的计算工作,可能还需要将前边的结果集,再作为源数据进行计算。这样的话可能就需要多个job依次执行,这样就需要用到了job链的配置。
2.job链的写法
找到Driver
类,在最后job.waitForCompletion(true)
,将其改为如下:
public class ProfitDriver1 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//获取job对象
Job job = Job.getInstance(conf);
//设置job方法入口的驱动类
job.setJarByClass(ProfitDriver1.class);
//设置Mapper组件类
job.setMapperClass(ProfitMapper1.class);
//设置mapper的输出key类型
job.setMapOutputKeyClass(Text.class);
//设置Mappper的输出value类型,注意Text的导包问题
job.setMapOutputValueClass(IntWritable.class);
//设置reduce组件类
job.setReducerClass(ProfitReducer1.class);
//设置reduce输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径
FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/profit"));
//设置输出结果路径,要求结果路径事先不能存在
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/profit/result"));
//提交job
//使用了MR框架提供的job链机制
if(job.waitForCompletion(true)){
Job job2 = Job.getInstance(conf);
job2.setMapperClass(ProfitMapper2.class);
job2.setMapOutputKeyClass(Profit.class);
job2.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job2, new Path("hdfs://hadoop01:9000/profit/result"));
FileOutputFormat.setOutputPath(job2, new Path("hdfs://hadoop01:9000/profit/result1"));
job2.waitForCompletion(true);
};
}
}
以上,即简单实现了多个job一次启动的问题。