hadoop作业执行流程及代码简略解读

2023-11-10

hadoop作业执行流程及代码简略解读


本文:参考了网上的博文。出处也不知是哪里,不好意思。最近整理磁盘文档发现的好资料所以整理补充了一下供大家学习参考一下吧。

1.主要组成部分:
  Hadoop包括hdfs与mapreduce两部分,hdfs则为底层的分布式存储系统、mapreduce则为用于处理存储在hdfs中的数据的编程模型。
  mapreduce作业提交执行主要涉及到这几个主要的类:jobclient,JobTracker与TaskTracker。
     1.1.JobClient
             每一个job都会在用户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS,并把路径提交到JobTracker,然   后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。
     1.2.JobTracker
             JobTracker是一个master服务,启动之后JobTracker会接收job,负责调度job的每个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。
     1.3.TaskTracker
             TaskTracker是运行于多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。2.主要数据结构
     2.1.JobInProgress
             JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指 定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。
     2.2.TaskInProgress
             JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的 TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一 定运行在同一个TaskTracker中。
     2.3 MapTask和ReduceTask
               一个完整的job会自动依次执行Mapper、Combiner(如有的话)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集 按<key1,value1>对读入,处理完成生成临时的<key2,List value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3> 对。
3.整体流程
      一道MapRedcue作业是通过JobClient.rubJob(job)向master节点的JobTracker提交的, JobTracker接到JobClient的请求后把其加入作业队列中。JobTracker一直在等待JobClient通过RPC提交作业,而TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, 则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程。slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。


4.Jobclient
        4.0 配置一个作业
         在编写MapReduce程序时通常是上是这样写的:
         Job job=new Job(conf,"ClosedDataCube");
         //指定作业代码

         job.setJarByClass(ClosedDataCube.class);
         //设置map

         job.setMapperClass(cubeMapper.class);
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(Text.class);
         //设置combiner
         job.setCombinerClass(cubeCombiner.class);
         //设置reduce
         job.setReducerClass(cubeReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);
         //设置输入输出
         FileInputFormat.addInputPath(job, new Path(args[0]));
         FileOutputFormat.setOutputPath(job,new Path(args[1]));
 
         System.exit(job.waitForCompletion(true)?1:0);
  类Job:以作业提交者得角看作业的状况。允许用户配置作业、提交作业、控制执行、查询状态。所有的set方法在提交作业时开始执行。
4.1 提交作业
        1.job.waitForCompletion(true):
         提交作业并等待其执行结束。在这里主要通过submit()方法提交一个作业。
        2.submit()方法的流程:
        RunningJob info=jobClient.submitJobInternal(conf); 通过JobClient提交一个作业。同时返回一个RunningJob 对象用于用户查询这个作业的相信信息。RunningJob是一个接口只有一个实现是JobClient中的NewWorkedJob描述一个job的详细信息。此时 job的状态为运行状态。
       3.JobClient.submitJobInternal(conf)
       JobClient是用户与JobTracker联系的一个基本的接口。提供提交作业的基本接口、跟踪作业、获得作业的运行状态信息等。
              0.创建作业的JobID并提交三个文件
              job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。
              job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。
              job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。
             1.检查作业的输入、输出
             2.计算map数目。即计算输入分片的数目。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。
             3.向JobTracker的fs中写入job文件
             4.jobSubmitClient.submitJob(jobId)真正的提交一个作业。并返回作业的状态对象句柄。
 4.jobSubmitClient.submitJob(jobId)
         jobSubmitClient是JobSubmissionProtocol的接口的对象。这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。这里讲JobTracker的处理流程。
         jobFile的提交过程是通过RPC(远程进程调用)模块来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口调用创建了JobTracker对象。与master取得联系。并调用的submitJob()方法提交。JobTracker创建job成功后会给JobClient传回一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。


5.JobTracker
        JobTracker是在网络环境中提交及运行MR任务的核心位置。
5.0 JobTracker启动
       JobTracker类中有一个main()函数,hadoop启动的时候执行此main()函数启动JobTracker进程,main()中生成一个JobTracker的对象,然后通过tracker.offerService()语句启动服务,即启动一些线程:
       1.taskScheduler:一个抽象类,被JobTracker用于安排执行在TaskTrackers上的task任务,它使用一个或多个JobInProgressListeners接收jobs的通知。另外一个任务是调用JobInProgress.initTask()为job初始化tasks。启动,提交作业,设置配置参数,终止等方法。
       2.completedJobsStoreThread对应completedJobStatusStore;CompletedJobStatusStore类:把JobInProgress中的job信息存储到DFS中;提供一些读取状态信息的方法;是一个守护进程,用于删除DFS中的保存时间超过规定时间的job status删除。
       3.interTrackerServer,抽象类Server类型的实例。一个IPC (Inter-Process Communication,进程间通信)服务器,IPC调用一个以一个参数的形式调用Writable,然后返回一个Writable作为返回值,在某个端口上运行。提供了call,listener,responder,connection, handle类。包括start(),stop(),join(),getListenerAddress(),call()等方法。

       job是统一由JobTracker来调度的,把具体的Task分发给各个TaskTracker节点来执行。下面来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。
5.1 JobTracker.submitJob(JobID)
      1.创建JobInProgress用于跟踪和调度这个job。维护着这个job的所有信息。
      JobInProgress在创建的时候会初始化一系列与任务有关的参数,调用到FileSystem,把在JobClient端上传的所有任务文件下载到本地的文件系 统中的临时目录里。这其中包括上传的*.jar文件包、记录配置信息的xml、记录分割信息的文件。
      2.检查这个job的操作是否可以执行
      3.检查集群内存是否允许运行这个job
      4.调用jobAdd(JobID,JobInProgress)将这个作业添加到jobs队列中

5.2 JobTracker.jobAdd(JobID,JobInProgress)
      1.调用监听器类EagerTaskInitializationListener将作业加入到jobInitQueue队列中。
               1.JobTracker 中的监听器组EagerTaskInitializationListener负责任务Task的初始化.JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一个专门管理需要初始化的队列里,即一个list成员变量jobInitQueue里。resortInitQueue 方法根据作业的优先级排序。然后调用notifyAll()函数,会唤起一个用于初始化job的线程JobInitManager来处理。
               2.其中的另一个监听器JobQueueJobInProgressListener用于调度作业执行。在这里为这个作业创建一个作业调度信息并一起放入到job队列中等待调度。

      2.JobInitManager
      JobInitManager收到信号后即取出最靠前的job,即优先级别最高的job,通过线程池开始进行真正调度。调度是通过调用JobInProgress.initTasks()实现。
5.3 JobInProgress.initTasks() 初始化MR任务
      任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。
      1.读取分片、得到分片数目
       调用JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的host,这个会在RawSplit创 建时通过FileSplit的getLocations()函数获取,该函数会调用DistributedFileSystem的getFileCacheHints()获得。当然如果是存储在本地文件系统中,即使用LocalFileSystem时当然只有一个location即“localhost”了。
      2.计算job的任务数是否超出限制,超出则不可执行。否则下一步
      3.首先JobInProgress会创建Map的监控对象。在initTasks()函数里通过建TaskInProgress对象数组实现。并且对对象数组初始化。
      4.initTasks()方法会通过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。slave端的 TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。
      5.JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者 ReduceTask。同样地,initTasks()也会通过createCache()方法产生nonRunningReduces成员。
      6.JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束。
5.4JobTracker调度Job
      0.hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量 jobQueueJobInProgressListener与上面说的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一个监听器类,它包含了一个映射,用来管理和调度所有的JobInProgress。jobAdded(job)同时会加入job到JobQueueJobInProgressListener中的映射。
      1.JobQueueTaskScheduler.assignTasks(TaskTrackerStatus) ,他实现了工作调度。
      1.JobTracker 接到TaskTracker 的heartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。首先它会检查 TaskTracker 端还可以做多少个 map 和 reduce 任务,将要派发的任务数是否超出这个数,是 否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask 或 ReduceTask 。 
       2.产生 Map 任务使用 JobInProgress 的 obtainNewMapTask() 方法,实质上最后调用了 JobInProgress 的 findNewMapTask() 访问 nonRunningMapCache 。
       上面讲解任务初始化时说过,createCache()方法会在网络拓扑结构上挂上需要执行的TaskInProgress。findNewMapTask()从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同数据中心下的节点,直到找了maxLevel层结束。这样的话,在 JobTracker给TaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。
      3.最终生成一个Task类对象,该对象被封装在一个LanuchTaskAction 中,发回给TaskTracker,让它去执行任务。

      4.产生 Reduce 任务过程类似,使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress 的 findNewReduceTask() 访问 nonRunningReduces。


6. TaskTracker
       TaskTracker是在网络环境中开始和跟踪任务的核心位置。与Jobtracker连接请求执行任务而后报告任务状态
6.0 TaskTracker的启动
        1. 与JobTracker一样,里面包含一个main()方法,在hadoop启动的时候启动此进程。
        Main()方法最主要的一句话  TaskTracker(conf).run();
        TaskTracker(conf)获取本机的一些配置信息,初始化服务器并启动服务器(StatusHttpServer);然后调用initialize(),这个方法才是真正构造TaskTracker的地方,把它作为一个单独的方法便可以再次调用并可以在close()之后回收对象,就是初始化一些变量对 象,最后启动线程:
        taskMemoryManager为TaskMemoryManagerThread类的对象。管理本机上task运行时内存的使用,杀死任何溢出和超出内存限制的task-trees。
        mapLauncher与reduceLauncher都是TaskLauncher类的对象,其作用是启动maptask和reducetask任务线程。根据tasksToLaunch判断是否需要新建任务,其中的调用的关系为:run()→startNewTask()→localizeJob()→launchTaskFor→JoblaunchTask() →localizeTask。
       2.run()方法中启动TaskTracker服务器然后一直循环。循环会尝试连接到的JobTracker。主要调用了两个方法startCleanupThreads(),offerService()。
       startCleanupThreads()启动为守护进程,可以用来删除一个独立线程的路径。
       offerService()类似于JobTracker中的offerService()方法,即服务器执行的主循环。规定的时间内给JobTracker发送心跳信息,并处理返回的命令。

下面具体介绍流程中的每一步。
6.1 TaskTracker加载Task到子进程
       Task的执行实际是由TaskTracker发起的,TaskTracker会定期与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等。如果发现有自己需要执行的新任务也会在这时启动,即是在TaskTracker调用JobTracker的heartbeat()方法时进行,此调用底层是通过IPC层调用Proxy接口实现。
       1.TaskTracker.run() 连接JobTracker
       TaskTracker的启动过程会初始化一系列参数和服务,然后尝试连接JobTracker(即必须实现InterTrackerProtocol接口),如果连接断开,则会循环尝试连接JobTracker,并重新初始化所有成员和参数。
       2.TaskTracker.offerService() 主循环
       如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。这个循环会每隔10秒与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。然后调用HeartbeatResponse的getActions()函数获得 JobTracker传过来的所有指令即一个TaskTrackerAction数组。再遍历这个数组,如果是一个新任务指令是LaunchTaskAction则调用调用addToTaskQueue加入到待执行队列,如果为commitTaskAction则否则加入到commitResponses。否则tasksToCleanup队列,交给一个 taskCleanupThread线程来处理,如执行KillJobAction 或者KillTaskAction等。
       3.TaskTracker.transmitHeartBeat() 获取JobTracker指令
       在transmitHeartBeat()函数处理中,TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务的执行状况,检查目前执行的Task数目以及本地磁盘的空间使用情况等,如果可以接收新的Task则设置heartbeat()的askForNewTask参数为true。然后通过IP C 接口调用JobTracker的heartbeat()方法发送过去,heartbeat()返回值TaskTrackerAction数组。

      4.TaskTracker.addToTaskQueue,交给TaskLauncher处理
      TaskLauncher是用来处理新任务的线程类,包含了一个待运行任务的队列 tasksToLaunch。
             1.根据action中的任务类型即是MapTask还是ReduceTask调用相应的TaskLanucher的addToTaskQueue添加action到task队列中。
             2.TaskTracker.addToTaskQueue会调用TaskTracker的registerTask,创建TaskInProgress对象来调度和监控任务,并把它加入到runningTasks队列中。同时将这个TaskInProgress加到tasksToLaunch 中,并notifyAll()唤醒一个线程运行,该线程从队列   tasksToLaunch取出一个待运行任务,调用TaskTracker的startNewTask运行任务。
      5.TaskTracker.startNewTask() 启动新任务
      调用localizeJob()真正初始化Task并开始执行。
      6.TaskTracker.localizeJob() 初始化job目录等
      此函数主要任务是初始化工作目录workDir,再将job jar包从HDFS复制到本地文件系统中,调用RunJar.unJar()将包解压到工作目录。然后创建一个RunningJob并调用addTaskToJob()函数将它添加到runningJobs监控队列中。addTaskToJob方法把一个任务加入到该 任务属于的runningJob的tasks列表中。如果该任务属于的runningJob不存在,先新建,加到runningJobs中。完成后即调用launchTaskForJob()开始执行Task。
       7.TaskTracker.launchTaskForJob() 执行任务
       启动Task的工作实际是调用TaskTracker$TaskInProgress的launchTask()函数来执行的。
       8.TaskTracker$TaskInProgress.launchTask() 执行任务
       执行任务前先调用localizeTask()更新一下jobConf文件并写入到本地目录中。然后通过调用Task的createRunner()方法创建TaskRunner对象并调用其start()方法最后启动Task独立的java执行子进程。
       9.Task.createRunner() 创建启动Runner对象
       Task有两个实现版本,即MapTask和ReduceTask,它们分别用于创建Map和Reduce任务。MapTask会创建MapTaskRunner来启动Task子进程,而ReduceTask则创建ReduceTaskRunner来启动。
      10.TaskRunner.start() 启动子进程
                1.TaskRunner负责将一个任务放到一个进程里面来执行。它会调用run()函数来处理,主要的工作就是初始化启动java子进程的一系列环境变量,包括设定工作目录workDir,设置CLASSPATH环境变量等。然后装载job jar包。
                2.在run中通过jvmManager.launchJvm(TaskRunner,JvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,workDir, env, pidFile, conf))方法管理该TaskTracker上所有运行的Task子进程。每一个进程都是由JvmRunner来  管理的, 它也是位于单独线程中的。JvmManager的launchJvm方法启动一个jvm。根据任务是map还是reduce,生成对应的JvmRunner并放到对应JvmManagerForType的进程容器中进行管理。JvmManagerForType的reapJvm()为一个任务启动一个JVM。
                3.分配一个新的JVM进程。如果JvmManagerForType槽满,就寻找idle的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。如果槽没有满,那么就启动新的子进程。生成新的进程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner线程的run  方法,run方法用于生成一个新的进程并运行它,具体实现是调用runChild.
                4.在执行即启动一个jvm即运行一个子进程。Child类。

6.3 子进程child执行MapTask
       0.真实的执行载体,是Child,它包含一个 main函数,进程执行,会将相关参数传进来,它会拆解这些参数,通过getTask(jvmId)向父进程索取任务,并且构造出相关的Task实例,然后使用Task的run()启动任务。
       1.run
       方法相当简单,配置完系统的TaskReporter后,就根据情况执行runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或执行map。
       2. 执行map即runNewMapper(job, split, umbilical, reporter)
                1.获得TaskAttemptContext的对象taskcontext用于获得其他相关信息,通过气获得mapper类对象、设置输入格式inputformat、重建输入分片split、构建RecordReader
               2.构造Mapper的输出即output,目的是通过output收集map的结果。通过RecordWrite进行的,也分两种情况,如果没有Reducer用NewDirectMapOutputCollector,否则用NewOutputCollector
               这是在新的API下的。而新的API下面真正作用还是老API即MapOutputCollector其有两个子类:MapOutputBuffer和DirectMapOutputCollector。 DirectMapOutputCollector用在不需要Reduce阶段的时候。如果Mapper后续有reduce任务,系统会使用   MapOutputBuffer做为输出, 
              3.根据创建好的以上信息创建maper的context
              4.最后mapper.run(Context)执行map
       3.Mapper的run(Context)
       会先创建对应的key,value对象,然后,对InputSplit的每一对<key,value>,调用用户实现的Mapper接口实现类的map方法,每处理一个数据对,就要使用OutputCollector收集每次处理kv对后得到的新的kv对,把他们spill到文件或者放到内存,以做进一步的处 理,比如排序,combine等。
       4.NewOutputCollector即output等同于context
                1.context.write(key,value)
                通过NewOutputCollector的收集每次调用map后得到的新的kv对,并把他们spill到文件或者放到内存,以做进一步的处理,比如排序,combine等。而其中实际操作的是MapOutputBuffer对象collector进行结果的收集。collector.collect(key,    value,partitioner.getPartition(key, value, partitions));MapOutputBuffer使用了一个缓冲区对map的处理结果进行缓存,放在内存中,又使用几个数组对这个缓冲区进行管理。在创建这个对象是判断是否有combiner有的话实例化一个    CombinerRunner。根据这个对象是否被实例化在下面的操作中决定是否执行combiner。
               2.在适当的时机,缓冲区中的数据会被spill到硬盘中。
               向硬盘中写数据的时机:
             (1)当内存缓冲区不能容下一个太大的k v对时。spillSingleRecord方法。
             (2)内存缓冲区已满时。SpillThread线程。这是MapOutputBuffer的内部类。
             (3)Mapper的结果都已经collect了,需要对缓冲区做最后的清理。Flush方法。
       5.MapOutputBuffer中combiner说明:
               0.有关combiner的类是在Task中实现的是Task的内部类MapOutputBuffer的内部类,主要有:CombinerRunner其又有两个子类NewCombinerRunner及OldCombinerRunner,这个类是combiner的实现处;另一个是实现了OutputCollector的CombinerOutputCollector  用于收集Combiner的输出。
              1.创建,通过静态方法CombinerRunner.create(job, getTaskID(), combineInputCounter,reporter, null)创建一个CombinerRunner的对象。如Job中设置了Combiner则进行创建NewCombinerRunner或是OldCombinerRunner。或者返回一个null
              2.如果创建了即非空则创建CombinerOutputCollector对象用于收集结果。其内有一个计数器记录收集到的结果个数。
      6.spillThread线程:将缓冲区中的数据spill到硬盘中。
             1.需要spill时调用函数sortAndSpill,按照partition和key做排序。默认使用的是快速排序QuickSort。如果没有combiner,则直接将记录写入到相应的分区中,否则,调用CombinerRunner的combine,先做combiner。然后输出。
             2.有combiner时处理时,首先为outputcollector设置write用于写文件。然后创建一个用于迭代map结果的Iterator。
             3.combinerRunner.combine(kvIter, combineCollector)执行combiner
             4.在NewCombinerRunner的combiner方法中通过反射获得reduce类对象,Combiner是继承与Reduce的。创建用于收集reduce结果的Reduce.Context.
            5.reducer.run(reducerContext)开始真正执行combiner就是一个reduce任务。 

6.4 子进程执行ReduceTask
      0.ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),runJobSetupTask(),runTaskCleanupTask()。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。
      1. Copy
      就是从执行各个Map任务的服务器那里,收到map的输出文件。拷贝的任务,是由ReduceTask.ReduceCopier 类来负责。通过ReduceCopier的fetchOutputs()方法取得map的结果
     流程: 使用ReduceCopier.fetchOutputs开始
   (1)索取任务。使用GetMapEventsThread线程。该线程的run方法不停的调用getMapCompletionEvents方法,该方法又使用RPC调用TaskUmbilicalProtocol协议的getMapCompletionEvents,方法使用所属的jobID向其父TaskTracker询问此作业个Map任务  的完成状况(TaskTracker要向JobTracker询问后再转告给它...)。返回一个数组TaskCompletionEvent events[]。TaskCompletionEvent包含taskid和ip地址之类的信息。

   (2)当获取到相关Map任务执行服务器的信息后,有一个线程MapOutputCopier开启,做具体的拷贝工作。它会在一个单独的线程内,负责某个Map任务服务器上文件的拷贝工作。MapOutputCopier的run循环调用copyOutput,copyOutput又调用    getMapOutput,使用HTTP远程拷贝。

   (3)getMapOutput远程拷贝过来的内容(当然也可以是本地了...),作为MapOutput对象存在,它可以在内存中也可以序列化在磁盘上,这个根据内存使用状况来自动调节。

   (4)同时合并,还有一个内存Merger线程InMemFSMergeThread和一个文件Merger线程LocalFSMerger在同步工作,它们将下载过来的文件(可能在内存中,简单的统称为文件...),做着归并排序,以此,节约时间,降低输入文件的数量,为后续的排序工作减   负。InMemFSMergeThread的run循环调用doInMemMerge,该方法使用工具类Merger实现归并,如果需要combine,则combinerRunner.combine。

 2.Sort(其实相当于合并)

       排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行。使用工具类Merger归并所有的文件。经过这一个流程,一个合并了所有所需Map任务输出文件的新文件产生了。而那些从其他各个服务器网罗过来的 Map任务输出文件,全部删除了。

 3.Reduce 
       1.Reduce任务的最后一个阶段。它会准备好Map的 keyClass("mapred.output.key.class"或"mapred.mapoutput.key.class"), valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。
       2.根据参数useNewAPI判断执行runNewReduce还是runOldReduce。分析润runNewReduce
       3.runNewReducer
                0.像报告进程书写一些信息
                1.获得一个TaskAttemptContext对象。通过这个对象创建reduce、output及用于跟踪的统计output的RecordWrit、最后创建用于收集reduce结果的Context
                2.reducer.run(reducerContext)开始执行reduce


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

hadoop作业执行流程及代码简略解读 的相关文章

随机推荐

  • 唐老师讲运算放大器(第五讲)——运放的应用

    一 常见运放的应用 二 运放用于电源降压 图示的D1为2 5V稳压管 若VIN 12V 那么运放的同相输入端为2 5V 又虚短可知 反向输入端的电压也为2 5V 那么Rfb2和Rfb1中间节点的电压为2 5V 此时 有运算放大器的性质可知
  • 知识蒸馏 (一) 综述

    一 综述 Knowledge Distillation A Survey 2021 IJCV Knowledge Distillation and Student Teacher Learning for Visual Intelligen
  • 《C和指针》笔记29:数组名和指针

    看下面的代码 int b 10 b 4 的类型是整型 但b的类型又是什么 它所表示的又是什么 一个合乎逻辑的答案是它表示整个数组 但事实并非如此 在C中 在几乎所有使用数组名的表达式中 数组名的值是一个指针常量 也就是数组第1个元素的地址
  • 斐波那契数列求和--C语言

    include
  • 服务器net0显示linkdown,某局点S10500交换机设备接口up/down的trap中没显示端口号信息...

    某客户使用我司S10500系列交换机 在使用过程中 用户配置了SNMP网管服务器 同时将设备的告警信息发送给网管服务器 在使用过程中发现 部分设备的接口变化的告警信息中没有具体接口号相关信息 只有接口索引值 如下为客户现场S10500交换机
  • 【Unity项目】登录界面

    数据库 登录时验证用户名和密码 使用MySQL数据库管理数据 类中的主要方法也都是使用SQL语句完成 类中还提供两个接口 一个通过username获取UserInfo 一个是直接获取到排行榜 数据库连接类 using System usin
  • Java中的Thread类

    目录 一 什么是Thread类 二 Thread类的基本用法 三 线程的并发执行 一 什么是Thread类 在java标准库中 提供了一个Thread类 用来表示 操作线程 Thread类可以视为是Java标准库提供的API Java是支持
  • img图片在限定尺寸下等比缩放;点击查看原图

    一 图片等比缩放 效果 html div class row img src div div class row div 查看原图 div div css 对img外层div尺寸限制 img长宽自适应 imgbox max width 10
  • Vue脚手架

    安装 前提 Node js安装 打开想要建脚手架的文件夹 把文件夹的地址改为cmd打开终端输入以下代码 1 执行npm install g vue cli安装脚手架 2 安装完成后 可以使用vue version 3 我们测试下 在桌面创建
  • 微软网盘onedrive

    将微软网盘 做成一个共享网站 好处就是可以分享大文件 而且还不限制下载速度 如果你受够了百度网盘的限速 可以试试这个 来看看常见的几款Onedrive网盘程序 1 OneIndex 第一个微软网盘分享程序 php编写 兼容性好 我在用 作者
  • Qt鼠标事件

    新建桌面应用程序testMouseEvent 类名MouseEvent 基类QMainWindow 通过重写鼠标事件达到跟踪鼠标位置的效果 添加2个标签做转态及位置更新用 重写鼠标事件 mouseevent h ifndef MOUSEEV
  • 早早踏入刷脸支付市场的人收获颇丰

    支付行业是永远不会萧条的行业 因为只要市场上存在交易 有资金流通就需要支付工具的支撑 不仅如此 支付行业的收入十分稳定 它产生的利润不会受商品价位变化的周期性影响 无论商家经营好坏 它的利润都是保持不变的 所以支付行业对于一些追求持续稳定利
  • 基于python的数字图像处理--学习笔记(三)

    基于python的数字图像处理 学习笔记 三 前言 一 灰度拉伸 二 幂律 伽马 变换 三 对数变换 前言 进入冈萨雷斯的第三章内容 并用python实现功能 我更改了代码源 之前找到太烂了 代码全是错 现在使用的代码很清晰 功能也很全 一
  • pytorch: where、gather函数

    一 where函数 torch where condition x y out x if condition is 1 y if condition is 0 In 29 cond torch rand 2 2 In 30 cond Out
  • Python-Numpy多维数组--切片,索引,高级索引,布尔索引

    一 Numpy 切片和索引 ndarray对象的内容可以通过索引或切片来访问和修改 就像 Python 的内置容器对象一样 如前所述 ndarray对象中的元素遵循基于零的索引 有三种可用的索引方法类型 字段访问 基本切片和高级索引 基本切
  • 苹果公司开始招人发力6G无线技术,你们怎么看?

    全球财经观察 新闻速递 看行业 根据彭博社的Mark Gurman发现的招聘信息 苹果正在招聘工程师 从事下一代6G无线技术的研发 根据Gurman的报道 这些职位位于苹果公司在硅谷和圣迭戈办公室 该公司在那里从事无线技术研发和芯片设计 新
  • 设计模式--工厂模式--抽象工厂模式

    工厂模式属于创建型模式基本原理 使用一个工厂类统一生产各种产品 主要流程 1 创建产品的基类 便于统一返回创建的产品 2 创建各种产品 继承基类 注意多态 3 创建工厂类 对每种产品进行区分创建 4 在使用时要先实例化工厂类 在调用期内生产
  • 洛谷-【入门1】顺序结构——C语言

    1 Hello World 题目描述 编写一个能够输出 Hello World 的程序 include
  • iframe 父子组件通信 vue3

    父组件
  • hadoop作业执行流程及代码简略解读

    hadoop作业执行流程及代码简略解读 本文 参考了网上的博文 出处也不知是哪里 不好意思 最近整理磁盘文档发现的好资料所以整理补充了一下供大家学习参考一下吧 1 主要组成部分 Hadoop包括hdfs与mapreduce两部分 hdfs则