Hadoop学习——MapReduce的job机制和job链介绍

2023-11-03

  前边写了MapReduce的介绍、以及四大组件、序列化机制和排序。
  这一篇记录一下MapReduce相关的job机制,对于在代码里,我们总要有一个Driver,比如下边:

public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		//获取job对象
		Job job = Job.getInstance(conf);
		//设置job方法入口的驱动类
		job.setJarByClass(ProfitDriver1.class);
		//设置Mapper组件类
		job.setMapperClass(ProfitMapper1.class);
		//设置mapper的输出key类型
		job.setMapOutputKeyClass(Text.class);
		//设置Mappper的输出value类型,注意Text的导包问题
		job.setMapOutputValueClass(IntWritable.class);
		//设置reduce组件类
		job.setReducerClass(ProfitReducer1.class);
		//设置reduce输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//设置输入路径
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/profit"));
		//设置输出结果路径,要求结果路径事先不能存在
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/profit/result"));
		//提交job
		job.waitForCompletion(true)
	}
}

  当我们在写完组件时,比如MapperReducer组件时,都会到这个类里,将它们定义到Job里,这样任务才能执行起来。那它的工作流程是啥呢?

一、job机制

1、job的介绍

  当我们在集群中,将我们写的Driver和这些组件类,打成jar包。执行这个 MapReducejar包时,比如下边的命令:

hadoop jar flow.jar

  执行后,大概的步骤如下(hadoop 1.0版本):

  1. 将该jar包临时上传到hdfs里的tmp目录下,然后解析jar包,查看环境及命令是否符合规范,符合则访问jobtracker
  2. jobtracker访问namenode获取执行文件的信息,然后对数据进行逻辑切片(一般该切片大小的设置与实际存储在datanode上的切块大小一致)。
  3. 切片之后,会按照切片数量,交给不同的tasktrackertasktracker按照自己获得的切片,会向datanode发起rpc请求。
  4. datanode会将块返回给tasktracker,创建任务,生成jobid,并执行,最后删除tmpjar包。

  这种即是mapreduce的job任务简单流程。

2、job任务的要点
  1. job任务在对数据进行逻辑切片时,一般与datanode的切块大小一致,因为如果两者不一致,势必会有map任务,需要从两个datanode上获取完整的数据,当split大于block,会引起数据的拷贝,从另一个datanode找剩余的数据,拷贝到mapreduce任务所在节点上,然后再执行。
  2. 因为namenode网络中访问量较大,一般jobtrackernamenode在一台机器上,减少带宽的影响,避免网络传输。
  3. 另外,常理上来说,datanodetasktracker也是会在同一台机器上,但如果两者数量不一致的时候,那对应可能就会处在其中任意一台,数据只能从其他机器上拷贝数据了。(这个我其实没缕明白。可能不对。)
3、Hadoop 1.X 版本的内部执行流程

  hadoop1.0 job任务的执行流程图如下:
在这里插入图片描述
第一阶段:Run Job

  该阶段由客户端来完成,底层有一个jobClient类来做具体实现。该阶段主要完成:

  ①、做job环境信息的收集,比如各个组件类,以及输出的key、value类型,然后检测是否合法;

  ②、检测输入的路径是否合法,以及输出结果路径的合法性;

  如果以上检查未通过,则直接报错,不会做后续的job提交动作。

第二阶段:Get New Job ID

  如果第一阶段run Job检测通过,jobClient会向JobTracker为当前job申请一个jobID,jobID是全局唯一的,用于标识一个job。

第三阶段:Copy Job Resources

  这个阶段是JobClient把job的运算资源上传到HDFS。路径为:/tmp/hadoop-yarn/staging/UserName/

  运算资源包括:①、jar包 ②、文件的切片信息 ③、job.xml整个job的环境参数

第四阶段:Submit job

  当jobClient将运算资源上传到HDFS之后,提交Job。

第五阶段:Initialise Job

  初始化Job信息

第六阶段:Retrieve Input Splits

  获取Job的切片数量

  注:第五第六两个阶段的目的在于获取整个Job的MapTask任务数量和ReduceTask的任务数量,MapTask任务数量=切片(Split)数量,ReduceTask的任务数量为代码中设定,默认为1。

第七阶段:HeartBeat(Return Tasks)

  TaskTracker通过心跳机制向jobTracker领取任务,要满足数据本地化策略,即优先领取保存在它所在服务器上的那部分数据块。切片和切块有所不同。

  切片是一个对象(FileSplit),封装的是一个块的描述信息;切块是文件块,里面存储的是真正的文件,切块是真实的数据。

  补充: MR框架虽然在处理任务时满足数据本地化策略,但是,为了确保读取的完整性,也会做行的追溯(因为切块不是根据行切,而是根据大小切,肯定会有数据断开的情况),这个过程肯定会发生网络数据的传输。

第八阶段:Retrieve Job Resources

  TaskTracker去HDFS下载Job的运行资源。体现的思想是:移动的是运算(运算资料),而不是数据,也是尽可能的节省带宽。

第九阶段:Launch

  启动JVM进程。

第十阶段:Run

  运行MapTaskReduceTask

二、 job链

1.介绍

  有这么一种场景,比如写了一套MapperReducer组件,但是完不成我们的最后的计算工作,可能还需要将前边的结果集,再作为源数据进行计算。这样的话可能就需要多个job依次执行,这样就需要用到了job链的配置。

2.job链的写法

  找到Driver 类,在最后job.waitForCompletion(true) ,将其改为如下:

public class ProfitDriver1 {

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		//获取job对象
		Job job = Job.getInstance(conf);
		//设置job方法入口的驱动类
		job.setJarByClass(ProfitDriver1.class);
		//设置Mapper组件类
		job.setMapperClass(ProfitMapper1.class);
		//设置mapper的输出key类型
		job.setMapOutputKeyClass(Text.class);
		//设置Mappper的输出value类型,注意Text的导包问题
		job.setMapOutputValueClass(IntWritable.class);
		//设置reduce组件类
		job.setReducerClass(ProfitReducer1.class);
		//设置reduce输出的key和value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//设置输入路径
		FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop01:9000/profit"));
		//设置输出结果路径,要求结果路径事先不能存在
		FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/profit/result"));
		//提交job
		//使用了MR框架提供的job链机制
		if(job.waitForCompletion(true)){
			Job job2 = Job.getInstance(conf);
			job2.setMapperClass(ProfitMapper2.class);
			job2.setMapOutputKeyClass(Profit.class);
			job2.setMapOutputValueClass(NullWritable.class);
			FileInputFormat.setInputPaths(job2, new Path("hdfs://hadoop01:9000/profit/result"));
			FileOutputFormat.setOutputPath(job2, new Path("hdfs://hadoop01:9000/profit/result1"));
			job2.waitForCompletion(true);
		};
	}
}

以上,即简单实现了多个job一次启动的问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hadoop学习——MapReduce的job机制和job链介绍 的相关文章

  • 线程“main”中出现异常java.lang.UnsupportedClassVersionError,不支持的major.minor版本52.0

    我尝试在 hadoop 1 0 4 上运行 WordCount 示例 但收到以下错误 Exception in thread main java lang UnsupportedClassVersionError WordCount Uns
  • 当与curl一起使用--negotiate时,是否需要keytab文件?

    The 文档 http hadoop apache org docs stable hadoop project dist hadoop hdfs WebHDFS html描述如何连接到 kerberos 安全端点显示以下内容 curl i
  • Spark 2.0 弃用了“DirectParquetOutputCommitter”,没有它如何生活?

    最近 我们从 HDFS 上的 EMR gt S3 上的 EMR 启用了一致视图的 EMRFS 迁移 我们意识到 Spark SaveAsTable 镶木地板格式 写入 S3 的速度比 HDFS 慢约 4 倍 但我们发现使用 DirectPa
  • Hadoop setInputPathFilter错误

    我正在使用 Hadoop 0 20 2 无法更改 并且我想向我的输入路径添加一个过滤器 数据如下 path1 test a1 path1 test a2 path1 train a1 path1 train a2 我只想处理所有文件trai
  • 无法使用 PDI 步骤连接到 HDFS

    我已经配置成功了Hadoop 2 4 in an Ubuntu 14 04 虚拟机 from a 视窗8系统 Hadoop 安装工作绝对正常 而且我还可以从 Windows 浏览器查看 Namenode 附图如下 所以 我的主机名是 ubu
  • Sqoop Import --password-file 功能在 sqoop 1.4.4 中无法正常工作

    我使用的是hadoop 1 2 1 sqoop版本是1 4 4 我正在尝试运行以下查询 sqoop import connect jdbc mysql IP 3306 database name table clients target d
  • Curl下载到HDFS

    我有这个代码 curl o fileName csv url xargs hdfs dfs moveFromLocal 1 somePath 当我执行此代码时 curl 将请求中的值放入 fileName csv 中 该文件将移动到 HDF
  • Hadoop 安全模式恢复 - 花费太长时间!

    我有一个包含 18 个数据节点的 Hadoop 集群 我在两个多小时前重新启动了名称节点 并且名称节点仍处于安全模式 我一直在寻找为什么这可能花费太长时间 但找不到好的答案 发帖在这里 Hadoop 安全模式恢复 花费大量时间 https
  • 将多个前缀行过滤器设置为扫描仪 hbase java

    我想创建一台扫描仪 它可以为我提供带有 2 个前缀过滤器的结果例如 我想要其键以字符串 x 开头或以字符串 y 开头的所有行 目前我知道只能使用一个前缀 方法如下 scan setRowPrefixFilter prefixFiltet 在
  • Hive查询快速查找表大小(行数)

    是否有 Hive 查询可以快速查找表大小 即行数 而无需启动耗时的 MapReduce 作业 这就是为什么我想避免COUNT I tried DESCRIBE EXTENDED 但这产生了numRows 0这显然是不正确的 对新手问题表示歉
  • Sqoop mysql错误-通信链路故障

    尝试运行以下命令 sqoop import connect jdbc mysql 3306 home credit risk table bureau target dir home sqoop username root password
  • 猪如何过滤不同的对(对)

    我是猪的新手 我有一个 Pig 脚本 它在两个元素之间生成制表符分隔的对 每行一对 例如 John Paul Tom Nik Mark Bill Tom Nik Paul John 我需要过滤掉重复的组合 如果我使用 DISTINCT 我会
  • Namenode高可用客户端请求

    谁能告诉我 如果我使用java应用程序请求一些文件上传 下载操作到带有Namenode HA设置的HDFS 这个请求首先去哪里 我的意思是客户端如何知道哪个名称节点处于活动状态 如果您提供一些工作流程类型图或详细解释请求步骤 从开始到结束
  • hadoop中reducer的数量

    我正在学习hadoop 我发现减速器的数量非常令人困惑 1 reducer的数量与partition的数量相同 2 reducer 的数量是 0 95 或 1 75 乘以 节点数 每个节点的最大容器数 3 减速机数量设定为mapred re
  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • 猪参考

    我正在学习 Hadoop Pig 并且我总是坚持引用元素 请查找下面的示例 groupwordcount group chararray words bag of tokenTuples from line token chararray
  • 伪分布式模式下的 Hadoop。连接被拒绝

    P S 请不要将此标记为重复 Hi 我一直在尝试以伪分布式模式设置和运行 Hadoop 当我运行 start all sh 脚本时 我得到以下输出 starting namenode logging to home raveesh Hado
  • 将数据从 oracle 移动到 HDFS,处理并从 HDFS 移动到 Teradata

    我的要求是 将数据从 Oracle 移至 HDFS 处理HDFS上的数据 将处理后的数据移至 Teradata 还需要每 15 分钟执行一次整个处理 源数据量可能接近50GB 处理后的数据也可能相同 在网上搜索了很多之后 我发现 PRARO
  • 非 hdfs 文件系统上的 hadoop/yarn 和任务并行化

    我已经实例化了 Hadoop 2 4 1 集群 并且发现运行 MapReduce 应用程序的并行化方式会有所不同 具体取决于输入数据所在的文件系统类型 使用 HDFS MapReduce 作业将生成足够的容器 以最大限度地利用所有可用内存
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测

随机推荐

  • HTML小白入门学习1

    目录 一 什么是HTML 二 HTML的语法 2 1 基本结构 2 1 1 基本结构的解释 三 实体 3 1 什么是实体 3 2 实体的语法 四 常用的标签 4 1 常用标签 目录 一 什么是HTML 二 HTML的语法 2 1 基本结构
  • C# 调用可执行exe文件几种方法小结

    1 利用进程池方式启动 string exefile xxx exe if File Exists exefile Process process new Process params 为 string 类型的参数 多个参数以空格分隔 如果
  • snprintf和strcpy和strncpy的区别

    概述 snprintf strcpy strncpy这几个函数的功能都是将原字符串拷贝到目的字符串中 但是在细节部分还是存在着一些细微的差别 主要参考man说明 snprintf 格式 int snprintf char str size
  • 最长公共子序列 (LCS) 详解+例题模板(全)

    欢迎访问https blog csdn net lxt Lucia 宇宙第一小仙女 o 萌量爆表求带飞 o dalao们点个关注呗 我只是一条可爱哒分界线 1 摘要 继上篇最长上升子序列后 本篇主要讲述最长公共子序列 LCS 2 LCS定义
  • HMC管理IBM小机

    IBM P5小机的HMC1和HMC2的IP地址默认为192 168 2 147和192 168 3 147 如果对此默认IP做过改动 后来又遗忘的情况下 通过进入P5小机液晶控制面板上的功能30 就能读出HMC端口的IP地址 具体步骤如下
  • WSI图像分割

    0 介绍 Whole Slide Image WSI 图像非常的大 处理起来比较麻烦 在深度学习中的病理切片图像大多数在 10万x10万分辨率 用平常的图像处理库没有办法读取 openslide 提供了一个很好的接口 这里介绍一个可用于处理
  • void*强制类型转换的应用(自己看吧)

    int main int num 20 void value NULL value num printf d n int value return 0 int test void p return int p int main int nu
  • Java—面向对象设计—类和对象

    理解面向对象程序设计 面向对象程序 Object oriented programming OOP 设计是继面向过程又一具有里程碑意义的编程思想 是现实世界模型的自然延伸 下面从结构化程序设计说起 逐步展示面向对象程序设计 结构化程序设计简
  • 深度学习的异构加速技术(一):AI 需要一个多大的“心脏”?

    欢迎大家前往腾讯云社区 获取更多腾讯海量技术实践干货哦 作者 kevinxiaoyu 高级研究员 隶属腾讯TEG 架构平台部 主要研究方向为深度学习异构计算与硬件加速 FPGA云 高速视觉感知等方向的构架设计和优化 深度学习的异构加速技术
  • RabbitMQ之延迟队列

    RabbitMQ之延迟队列 1 延迟队列概念 2 延迟队列使用场景 3 RabbitMQ 中的 TTL 3 1 消息设置 TTL 3 2 队列设置 TTL 3 3 两者的区别 4 整合 SpringBoot 4 1 创建项目 4 2 添加依
  • 深入解析C/C++的优缺点以及就业方向

    众所周知C C Java Python都是主流的后端开发语言 并且不同的语言 具备不同的优缺点以及就业方向 解析来小编带你一起看看C C 的优缺点以及就业方向 深入解析C C 的优缺点以及就业方向 C语言是很多语言的鼻祖 所以学会C语言 非
  • Welcome to Level 3 (不容易啊...记下啦^_@)

    Welcome to Level 3 Hey clin003 Congratulations Don t forget to check your Mod X ranking and access the level 3 forum to
  • 结合入栈出栈浅谈前序遍历,中序遍历,后序遍历

    二叉树深度遍历 讨巧应付面试 以前其实也懂什么叫二叉树的前中后序遍历 反正面试的时候 给我一个二叉树 我是知道怎么写出他们的答案的 例如如下二叉树 前序遍历 A B D E C F G 中序遍历 D B E A F C G 后序遍历 D E
  • Infinite Fraction Path【HDU-6223】【BFS+剪枝】

    题目链接 训练赛的时候 想到的做法是倍增维护 因为每个点的后继是唯一的 然后又因为不会桶排 所以的复杂度是一定会TLE的 难受 听说桶排还是会被卡 大雾 然后下来补题的时候听了队友的意见 其实比赛的时候就应该多听听 也许就能想到这个bfs了
  • 某返利网站admin目录index.php文件混淆加密算法分析

    恢复内容开始 文件已经加密 可以在此下载 index php 文件内容打开大概如此 简单字符替换之后 发现字符串用base64 decode仍无法解码 找到一个解码网站 找源码 解码后的文件如下 下载地址 尾部仍然有大量未知编码内容 简单修
  • IP协议的服务类型(翻译RFC 1349)

    目录 1 简介 2 目标和理念 3 服务八位字节类型规范 4 TOS字段的规范 5 Internet协议中TOS字段的使用 5 1 Internet控制消息协议 ICMP 5 2传输协议 5 3应用协议 6 ICMP和TOS字段 6 1无法
  • 圆投影匹配算法描述及实现

    解决模板图和基准图之间存在任意角度旋转的景象匹配问题的关键是找到一个旋转不变量 圆投影匹配算法就是利用 圆 的各向同性和投影特征提出来的 传统的图像匹配算法 如归一化交叉互相关算法 主要利用像素点与像素点之间的相关性计算匹配图像与模板之间的
  • 数据结构day4

    https note youdao com s SwyFfOgRhttps note youdao com s SwyFfOgR 学生管理系统 头文件 ifndef STULIST H define STULIST H include
  • Qt布局的使用

    1 控件的sizePolicy设置了Fixed 再一点击设置布局 控件的大小又自动调整了 解决方法 设置布局后 再手动调整控件大小 2 在要布局的部件上 右键 gt Lay out 如果该部件不支持布局 则右键菜单中不会出现 Lay out
  • Hadoop学习——MapReduce的job机制和job链介绍

    前边写了MapReduce的介绍 以及四大组件 序列化机制和排序 这一篇记录一下MapReduce相关的job机制 对于在代码里 我们总要有一个Driver 比如下边 public static void main String args