05-分布式计算框架

2023-11-17

目录

一,MapReduce

1,简介

2,原理

2.1 基本概念

2.2 程序执行过程

2.3 作业运行模式

二,Spark

1,简介

1.1 背景

1.2 概念

1.3 特点

2,原理

2.1 编程模型

2.2 运行模式

2.3 运行过程

2.4 DAG任务规划与调度


文章内容来自:南京大学 / 星环科技课程,大数据理论与实践课程Ⅰ

对细节部分引用其他网络资源进行补充。

一,MapReduce

1,简介

MR是面向离线批处理的分布式计算框架

核心思想:分而治之,并行计算。移动计算,非移动数据;

适用场景

  • 数据统计,如网站的PV、UV统计
  • 搜索引擎构建索引
  • 海量数据查询
  • 复杂数据分析算法实现

不适用场景

  • OLAP:要求毫秒或秒级返回结果
  • 流计算:输入数据集是动态的,而MapReduce是静态的
  • DAG计算
  • -多个任务之间存在依赖关系,后一个的输入是前一个的输出,构成DAG有向无环图
  • -MapReduce很难避免Suffle,造成大量磁盘IO,导致性能较为低下

2,原理

2.1 基本概念

1,Job & Task(作业与任务)

  • 作业是客户端请求执行的一个工作单元。包括输入数据、MapReduce程序、配置信息
  • 任务是将作业分解后得到的细分工作单元。分为Map任务和Reduce任务

2,Split(切片)

  • 输入数据被划分成等长的小数据块,称为输入切片(Input Split),简称切片
  • Split是逻辑概念,仅包含元数据信息,如数据的起始位置、长度、所在节点等
  • 每个Split交给一个Map任务处理,Split的数量决定Map任务的数量
  • Split大小默认等于HDFS Block大小,Split的划分方式由程序设定,Split与HDFS Block没有严格的对应关系。Split越小,Map任务越多,并发度越高,但开销也越大;Split越大,任务越少,并发度降低

3,Map阶段(映射)

  • 由若干Map任务组成,任务数量由Split数量决定
  • 输入:Split切片(key-value) 。输出:中间计算结果(key-value)

4,Reduce阶段(化简)

  • 由若干Reduce任务组成,任务数量由程序指定
  • 输入:Map阶段输出的中间结果(key-value)。输出:最终结果(key-value) 

5,Shuffle阶段(混洗)

  • Shuffle是Map和Reduce之间的强依赖关系(Shuffle依赖)导致的,即每个Reduce的输入依赖于所有Map的输出
  • Map和Reduce阶段的中间环节(虚拟阶段),分为Map端Shuffle和Reduce端Shuffle
  • 包括Partition(分区)、Sort(排序)、Spill(溢写)、Merge(合并)和Fetch(抓取)等工作

Partition(分区)

  • Reduce任务数量决定了Partition数量,Partition编号 = Reduce任务编号
  • 利用“哈希取模”对Map输出数据分区,即Partition编号 = key hashcode % reduce task num(%为取模)
  • Partition为具有相同编号的Reduce任务供数

哈希取模的作用

  • 数据划分:将一个数据集随机分成若干个子集(Hash函数选择不当可能造成数据倾斜)
  • 数据聚合:将Key相同的数据聚合在一起

避免和减少Shuffle是MapReduce程序调优的关键

2.2 程序执行过程

1,MR执行过程

 

2,shuffle详解

注意:在溢写之前需要先进行排序(便于后续归并排序),MapTask结束后仍需要通过归并排序将所有溢写文件合并为一个文件。

Map端

1,Map任务将中间结果写入环形内存缓冲区Buffer(默认100M);

2,当Buffer的数据量达到阈值(默认80%)时,对缓冲区内数据进行分区(Partition)和排序(Sort)。 先按“key hashcode % reduce task num”对数据进行分区,分区内再按key排序。然后将数据溢写(Spill)到磁盘的一个临时文件中。如果在溢写过程中,剩余20%的空间又被耗尽,这时就会触发panding,等80%空间腾出来之后再继续写;

3,Map任务结束前,将多个临时文件合并(Merge)为一个Map输出文件,文件内数据先分区后排序

Reduce端

1,Reduce任务从多个Map输出文件中抓取(Fetch)属于自己的分区数据(Partition编号=Reduce任务编号)

2,对抓取到的分区数据做归并排序,生成一个Reduce输入文件(文件内数据按key排序)

  • 如果内存缓冲区够大,就直接在内存中完成归并排序,然后落盘
  • 如果内存缓冲区不够,先将分区数据写到相应的文件中,再通过归并排序合并为一个大文件

关于环形缓冲区的介绍可以参考这里@大数据架构师Evan【设计思想赏析-MapReduce环形缓冲区】

2.3 作业运行模式

1,JobTracker/TaskTracker模式(Hadoop 1.X)

 

JobTracker节点(Master)

  • 调度任务在TaskTracker上运行
  • 若任务失败,指定新TaskTracker重新运行

TaskTracker节点(Slave)

  • 执行任务,发送进度报告

存在的问题

  • JobTracker存在单点故障
  • JobTracker负载太重(上限4k节点)
  • JobTracker缺少对资源的全面管理
  • TaskTracker对资源的描述过于简单
  • 源码难于理解

 

2,YARN模式(Hadoop 2.X )

二,Spark

1,简介

1.1 背景

MapReduce有较大的局限性

  • 仅支持Map、Reduce两种语义操作,划分为两个阶段(模型较为粗糙)
  • 执行效率低,时间开销大(很难避免Shuffle)
  • 主要用于大规模离线批处理
  • 不适合迭代计算、在线分析、实时流处理等场景

计算框架种类多,选型难,学习成本高

  • 批处理:MapReduce
  • 流处理:Storm、Flink
  • 在线分析:Impala、Presto
  • 机器学习:Mahout

统一计算框架,简化技术选型,降低学习成本

  • 在统一框架下,实现离线批处理、流处理、在线分析和机器学习

1.2 概念

由加州大学伯克利分校的AMP实验室开源

  

高性能的分布式通用计算引擎

  • Spark Core:核心计算框架
  • Spark SQL:结构化数据查询
  • Spark Streaming:实时流处理
  • Spark MLib:机器学习
  • Spark GraphX:图计算

具有高吞吐、低延时、通用易扩展、高容错等特点

采用Scala语言开发

提供多种运行模式

1.3 特点

计算高效

  • 语义操作多样,模型设计精细
  • 利用RDD内存计算Cache缓存机制,支持迭代计算数据共享,减少数据读取的IO开销
  • 利用DAG引擎,减少中间计算结果写入HDFS的开销
  • 利用多线程池模型,减少任务启动开销,避免Shuffle中不必要的排序和磁盘IO操作

  

通用易用

  • 适用于批处理、流处理、在线分析、机器学习等场景
  • 提供了丰富的开发API,支持Scala、Java、Python、R等

运行模式多样

  • Local模式
  • Standalone模式
  • YARN/Mesos模式

2,原理

2.1 编程模型

1,RDD(Resilient Distributed Datesets) 弹性分布式数据集

RDD相当于Table,由分布在集群中的多个Partition组成

Partition(分区)

  • 分布在集群的不同节点中
  • 只读数据集
  • 通过转换操作来构造
  • 失效后自动重构(弹性)
  • 存储在内存磁盘中

Spark基于RDD进行计算

 

2,RDD操作(Operator)

Transformation(转换)

  • 将Scala集合或Hadoop输入数据构造成一个新RDD
  • 通过已有的RDD产生新RDD
  • 惰性执行:只记录转换关系,不触发计算
  • 例如:map、filter、flatmap、union、distinct、sortbykey

Action(动作)

  • 通过RDD计算得到结果或者落盘
  • 真正触发计算
  • 例如:first、count、collect、foreach、saveAsTextFile

以rdd1.map(_,+1).saveAsTextFile(“hdfs://node01:9000”)为例

3,RDD依赖(Dependency)

窄依赖(Narrow Dependency)

  • 每个父RDD分区只能为一个子RDD分区供数,
  • 子分区所依赖的父分区集合之间没有交集
  • 子RDD分区数据丢失或损坏,从其依赖的父RDD分区重新计算即可,无需Shuffle
  • 例如:map、filter、union

宽依赖(Wide/Shuffle Dependency)

  • 每个父RDD分区为所有子RDD分区供数
  • 子RDD区数据丢失或损坏,从所有父RDD分区重新计算,必须Shuffle
  • 相对于窄依赖,宽依赖付出的代价要高很多,尽量避免使用
  • 例如:groupByKey、reduceByKey、sortByKey

 

4,示例:WordCount

val rdd1 = sc.textFile(“hdfs://node01:9000/data/in”)

val rdd2 = rdd1.flatMap(_.split(“\t”))

val rdd3 = rdd2.map((_,1))

val rdd4 = rdd3.reduceByKey((_+_))

rdd4.saveAsTextFile(“hdfs://node01:9000/data/out”)

 

 

2.2 运行模式

1,抽象模式

Driver

  • 每个Spark作业启动一个Driver,每个Driver创建一个SparkContext
  • 负责解析Spark程序、划分Stage、调度任务到Executor上执行

SparkContext

  • 负责加载配置信息,初始化运行环境,创建DAGScheduler和TaskScheduler
  • DAGScheduler:根据任务依赖建立DAG、根据宽依赖划分Stage、提交TaskSet
  • TaskScheduler:任务调度和监管

Executor

  • 负责执行Driver分发的任务,一个节点可以启动多个Executor,每个Executor通过多线程运行多个任务

Task

  • Spark运行的基本单位,一个Task负责处理若干RDD分区的计算逻辑

2,Local模式

单机运行,通常用于测试

Spark程序以多线程方式直接运行在本地

3,Standalone模式

Spark集群独立运行,不依赖于第三方资源管理系统,如YARN、Mesos

采用Master/Slave架构

  • Master统一管理集群
  • Driver在Worker中运行

ZooKeeper负责Master HA,避免单点故障

适用于集群规模和数据量都不大的情况

4,YARN模式

YARN-Client模式:适用于交互和调试

YARN-Cluster模式:适用于生产环境 

区别在于Driver位置。Driver放在Client主要便查看日志,便于调试。

  

 

2.3 运行过程

1,生成逻辑计划

 

2,生成物理计划

3,任务调度与执行

 

2.4 DAG任务规划与调度

DAG(Directed Acyclic Graph)

  • 有向无环图DAG:一个有向图无法从任意顶点出发经过若干条边回到该点
  • 受制于某些任务必须比另一些任务早执行的约束,Spark程序的内部执行逻辑可由DAG描述,节点代表任务,边代表任务间的依赖约束

DAGScheduler

  • 根据任务的依赖关系建立DAG
  • 根据依赖关系是否为宽依赖,即是否存在Shuffle,将DAG划分为不同的阶段(Stage)
  • 将各阶段Task组成的TaskSet提交给TaskScheduler

TaskScheduler

  • 负责任务调度
  • 重新提交失败的Task
  • 为执行速度慢的Task启动备用Task

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

05-分布式计算框架 的相关文章

  • MapReduce - 如何按值对减少输出进行排序

    如何按值降序对减速器输出进行排序 我正在开发一个必须返回最常听的歌曲的应用程序 因此 歌曲必须按照收听次数排序 我的应用程序以这种方式工作 Input songname userid boolean MapOutput songname u
  • Spark 在 Hbase 的 InputSplit 期间给出空指针异常

    我正在使用 Spark 1 2 1 Hbase 0 98 10 和 Hadoop 2 6 0 从 hbase 检索数据时出现空点异常 找到下面的堆栈跟踪 sparkDriver akka actor default dispatcher 2
  • 我如何调试 Hadoop MapReduce [重复]

    这个问题在这里已经有答案了 我正在尝试构建一个地图缩减作业 它运行完成 但最后呈现奇怪的数据 当我尝试使用 system out println debug data 调试它时 它没有显示在屏幕上 使用 java API 生成外部日志文件
  • 如何用hadoop实现自连接/叉积?

    对成对的项目进行评估是常见的任务 示例 重复数据删除 协同过滤 相似项目等 这基本上是具有相同数据源的自连接或叉积 要进行自连接 您可以遵循 减少端连接 模式 映射器将连接 外键作为键发出 将记录作为值发出 因此 假设我们想要对以下数据的
  • 如何在 Spark 中从文本文件创建 DataFrame

    我在 HDFS 上有一个文本文件 我想将其转换为 Spark 中的数据帧 我正在使用 Spark 上下文加载文件 然后尝试从该文件生成各个列 val myFile sc textFile file txt val myFile1 myFil
  • Spark 选择 RDD 中的最高值

    原始数据集是 numbersofrating title avg rating newRDD 3 monster 4 4 minions 3D 5 我想在newRDD中选择前N个avg ratings 我使用以下代码 它有一个错误 sele
  • 使用 MongoDB 的 MapReduce 选择不同的多个字段

    我想在 MongoDB 上执行这个 SQL 语句 SELECT DISTINCT book author from library 到目前为止 MongoDB 的 DISTINCT 一次仅支持一个字段 对于多个字段 我们必须使用 GROUP
  • Log4j RollingFileAppender 未将映射器和减速器日志添加到文件中

    我们希望将应用程序日志打印到本地节点上的文件中 我们使用 Log4j 的 RollingFileAppender Our log4j properties文件如下 ODS LOG DIR var log appLogs ODS LOG IN
  • rdd后面的数字是什么意思

    rdd后面括号里的数字是什么意思 RDD后面的数字是它的标识符 Welcome to version 2 3 0 Using Scala version 2 11 8 OpenJDK 64 Bit Server VM Java 1 8 0
  • 访问 Scala 中可用但 PySpark 中不可用的依赖项

    我正在尝试访问 RDD 的依赖项 在 Scala 中 这是一个非常简单的代码 scala gt val myRdd sc parallelize 0 to 9 groupBy 2 myRdd org apache spark rdd RDD
  • FAILED 错误:java.io.IOException:所有收集器的初始化失败

    我在运行 MapReduce WordCount 作业时遇到一些错误 错误 java io IOException 所有收集器的初始化 失败的 最后一个收集器中的错误是 class wordcount wordmapper at org a
  • CouchDB“加入”两个文档

    我有两个看起来有点像这样的文档 Doc id AAA creator id data DataKey id credits left 500 times used 0 data id AAA 我想要做的是创建一个视图 它允许我传递 Data
  • 两个相等的组合键不会到达同一个减速器

    我正在使用 MapReduce 框架用 Java 制作 Hadoop 应用程序 我仅使用文本键和值进行输入和输出 在减少最终输出之前 我使用组合器进行额外的计算步骤 但我有一个问题 钥匙没有进入同一个减速器 我在组合器中创建并添加键 值对
  • hadoop map reduce 中的错误处理

    根据文档 有几种方法可以在 MapReduce 中执行错误处理 以下是一些 A 使用枚举的自定义计数器 每个失败记录的增量 b 记录错误并稍后分析 计数器给出失败记录的数量 然而 为了获取失败记录的标识符 可能是其唯一键 以及发生异常的详细
  • 为什么 Spark 在字数统计时速度很快? [复制]

    这个问题在这里已经有答案了 测试用例 Spark 在 20 秒以上对 6G 数据进行字数统计 我明白映射减少 FP and stream编程模型 但无法弄清楚字数统计的速度如此惊人 我认为这种情况下是I O密集型计算 不可能在20秒以上扫描
  • Hive 上的自定义 MapReduce 程序,规则是什么?输入和输出怎么样?

    我被困了几天 因为我想根据我在 hive 上的查询创建一个自定义的地图缩减程序 在谷歌搜索后我发现没有太多例子 而且我仍然对规则感到困惑 创建自定义 MapReduce 程序的规则是什么 映射器和减速器类怎么样 任何人都可以提供任何解决方案
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • 从 Eclipse 在 AWS-EMR 上运行 MapReduce 作业

    我在 Eclipse 中有 WordCount MapReduce 示例 我将其导出到 Jar 然后将其复制到 S3 然后我在 AWS EMR 上运行它 成功地 然后 我读到了这篇文章 http docs aws amazon com El
  • RavenDB:为什么我会在此多重映射/归约索引中获得字段空值?

    受到 Ayende 文章的启发https ayende com blog 89089 ravendb multi maps reduce indexes https ayende com blog 89089 ravendb multi m
  • 适用于 Hadoop 的 DynamoDB 输入格式

    我必须使用 Hadoop mapreduce 处理保留在 Amazon Dynamodb 中的一些数据 我在互联网上搜索 Dynamo DB 的 Hadoop InputFormat 但找不到它 我对 Dynamo DB 不熟悉 所以我猜测

随机推荐

  • C练题笔记之:Leetcode-565. 数组嵌套

    题目 索引从0开始长度为N的数组A 包含0到N 1的所有整数 找到最大的集合S并返回其大小 其中 S i A i A A i A A A i 且遵守以下的规则 假设选择索引为i的元素A i 为S的第一个元素 S的下一个元素应该是A A i
  • 栈的 创建,入栈,出栈,清空栈,遍历栈 的实现

    数据结构 的学习视频 https www bilibili com video av6159200 from search seid 6709590585276522157 一 算法 栈 数据进出 类向箱子放东西和拿东西 先进后出 或者说后
  • OpenGL Vertex Buffer Objects(VBOs)

    OpenGL Vertex Buffer Objects VBOs 分类 OpenGL2010 05 20 12 53 3714人阅读 评论 13 收藏 举报 buffer float list struct 存储 工作 原创文章转载请注明
  • python 异常之 ValueError: invalid literal for int() with base 10: ‘xxx‘

    文章目录 1 异常例子 2 源代码 3 int x base 的正确使用方法 1 异常例子 代码 if name main print int 123 print int aaa 执行代码 2 源代码 可以看到 在执行 print int
  • LINUX 防火墙iptables常用指令

    封单个IP的命令 iptables I INPUT s 124 115 0 199 j DROP 封IP段的命令 iptables I INPUT s 124 115 0 0 16 j DROP 封整个段的命令 iptables I INP
  • “字符串的展开”【题解】

    字符串的展开 的题目 题目 题目描述 在初赛普及组的 阅读程序写结果 的问题中 我们曾给出一个字符串展开的例子 如果在输入的字符串中 含有类似于 d h 或者 4 8 的字串 我们就把它当作一种简写 输出时 用连续递增的字母或数字串替代其中
  • golang 读取yaml配置文件中的数据 两种方式:yaml.v2 和 Viper

    golang 读取yaml配置文件中的数据 yaml 配置文件 config yaml 中 写数据 app host 127 0 0 1 port 3306 username admin password admin log suffix
  • 110. 平衡二叉树

    给定一个二叉树 判断它是否是高度平衡的二叉树 本题中 一棵高度平衡二叉树定义为 一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过 1 Definition for a binary tree node public class Tr
  • VScode设置字体大小

    VScode如何设置字体大小 第一步 首先打开vscode 在vscode的左下角有一个设置按钮 单机打开 选择settings选项 第二步 在设置中查找font 字体 选项 并打开 第三步 在font选项内 选择font size 就可以
  • jQuery验证码插件:jquery.idycode.js

    对于任何一个又评论功能的网站来说 验证码都是重中之重 没有验证码的话 用户就可以肆意刷评论 甚至是通过一些工具来操作 会对网络环境产生极大的危害 验证码这个词最早是在2002年由卡内基梅隆大学的路易斯 冯 安 Manuel Blum Nic
  • 标识符和关键字应该如何理解?

    思考 为什么语言中需要关键字和表示符 程序来源于生活 想想我们人类在生产生活过程中的一些语言使用都有其特定的含义 而每个事物或者事物的一些属性功能也都需要给予特定的语言符号来表示 故java语言的发明者们按照人类的方式创造除了一门值得大家学
  • 分布式、微服务概念

    目录 1 目前软件架构大致分类 2 各种架构技术方法 3 什么是微服务 4 微服务架构特点 5 什么是SOA 6 SOA架构特点 7 SOA架构和微服务架构的区别 8 ESB和微服务API网关 9 什么是分布式 10 什么是集群 11 负载
  • R语言使用cumsum函数计算向量数据的累加和(cumulative sum )

    R语言使用cumsum函数计算向量数据的累加和 cumulative sum 目录 R语言使用cumsum函数计算向量数据的累加和 cumulative sum
  • glsl version 300es 关键字

    参考链接 GLSL ES Specification 3 00 变量名 不能要以gl 开头 注释 或 关键字 void float int uint bool void function name float var name 1 uint
  • JS混淆技术探究及解密方法分析

    随着Web技术的快速发展 JavaScript被广泛应用于网页开发 移动应用开发等领域 然而 JavaScript代码很容易被反编译 解密 这给保护网站和应用程序的安全性带来了严重的挑战 为了解决这个问题 JS混淆技术应运而生 JS混淆就是
  • Redux持久化插件-解决刷新页面数据丢失问题

    最近在使用react的时候有用到redux 对数据进行全局的状态管理 但是发现和vuex一样会出现刷新之后数据丢失的问题 于是在github上面查阅了 redux persist 插件 使用redux persist进行持久化数据存储 通常
  • Rstudio更换主题/样式

    github项目地址 https github com gadenbuie rsthemes 安装 在 rstudio 的控制台console中数据 install packages devtools devtools install gi
  • 为什么会说:程序员年龄越大,越容易失业?

    在程序员的世界里 一直有一个传言 互联网公司没有35岁以上的中年人 从华为辞退34岁以上的员工 到腾讯辞退35 高级员工 似乎老程序员面临着 年龄危机 要时刻警惕在职场被 踢出 的危险 而国内其他很多职业 比如教师 医生 公务员 都在稳步发
  • python TCP通信雷达实时解析数据

    雷达解析程序 coding cp936 import socket import re class jiema def yushe3 self receve r receve av receve v receve h while True
  • 05-分布式计算框架

    目录 一 MapReduce 1 简介 2 原理 2 1 基本概念 2 2 程序执行过程 2 3 作业运行模式 二 Spark 1 简介 1 1 背景 1 2 概念 1 3 特点 2 原理 2 1 编程模型 2 2 运行模式 2 3 运行过