Spark 的Shuffle过程详解

2023-11-01

一、Shuffle的作用是什么?

Shuffle的中文解释为“洗牌操作”,可以理解成将集群中所有节点上的数据进行重新整合分类的过程。其思想来源于hadoop的mapReduce,Shuffle是连接map阶段和reduce阶段的桥梁。由于分布式计算中,每个阶段的各个计算节点只处理任务的一部分数据,若下一个阶段需要依赖前面阶段的所有计算结果时,则需要对前面阶段的所有计算结果进行重新整合和分类,这就需要经历shuffle过程。
在spark中,RDD之间的关系包含窄依赖和宽依赖,其中宽依赖涉及shuffle操作。因此在spark程序的每个job中,都是根据是否有shuffle操作进行阶段(stage)划分,每个stage都是一系列的RDD map操作。

二、shuffle操作为什么耗时?

shuffle操作需要将数据进行重新聚合和划分,然后分配到集群的各个节点上进行下一个stage操作,这里会涉及集群不同节点间的大量数据交换。由于不同节点间的数据通过网络进行传输时需要先将数据写入磁盘,因此集群中每个节点均有大量的文件读写操作,从而导致shuffle操作十分耗时(相对于map操作)。

三、Spark目前的ShuffleManage模式及处理机制

Spark程序中的Shuffle操作是通过shuffleManage对象进行管理。Spark目前支持的ShuffleMange模式主要有两种:HashShuffleMagnage 和SortShuffleManage
Shuffle操作包含当前阶段的Shuffle Write(存盘)和下一阶段的Shuffle Read(fetch),两种模式的主要差异是在Shuffle Write阶段,下面将着重介绍。

  • 1、HashShuffleMagnage

HashShuffleMagnage是Spark1.2之前版本的默认模式,在集群中的每个executor上,其具体流程如下图所示:

从图中可知,在executor中处理每个task后的结果均会通过buffler缓存的方式写入到多个磁盘文件中,其中文件的个数由shuffle算子的numPartition参数指定(图中partition为3)。因此Shuffle Write 阶段会产生大量的磁盘文件,整个Shuffle Write 阶段的文件总数为: Write阶段的task数目* Read阶段的task数目
由于HashShuffleManage方式会产生很多的磁盘文件,Spark对其进行了优化,具体优化点为:
(1)executor处理多个task的时候只会根据Read阶段的task数目(设为m)生成对应的文件数,具体做法是:处理第一个task时生成m个文件,后续task的结果追加到对应的m个文件中。
(2)考虑到executor的并行计算能力(core数量),处理任务的每个core均会生成m个文件。
因此,优化后的HashShuffleManage最终的总文件数:Write阶段的core数量* Read阶段的task数目

  • 2、SortShuffleManage

SortShuffleManage是Spark1.2及以上版本默认的ShuffleManage模式,具体包含普通模式和bypass模式。
1、普通模式
在集群中的每个executor上,其普通模式的具体流程如下图所示:

从图中可知,SortShuffleManage在数据写入磁盘文件前有两个重要操作:
(1)数据聚合,针对可聚合的shuffle操作(比如reduceBykey()),会基于key值进行数据的聚合操作,以此减少数据量。
(2)数据聚合之后会对数据进行排序操作。
(问题:基于key排序?排序的目的是什么?),
最后对每个task生成的文件进行合并,通过索引文件标注key值在文件中的位置。
因此,SortShuffleManage产生的总文件数为:Writer 阶段的task数*2
2、bypass模式
bypass模式与HashShuffleMagnage基本一致,只是Shuffle Write 阶段在最后有一个文件合并的过程,最终输出的文件个数为:Writer阶段的task数目*2
spark.shuffle.sort.bypassMergeThreshold默认值为200,即Read阶段的task数目小于等于该阈值时以及Write端是非聚合操作(比如join),会启用bypass模式,其他情况下采用普通机制。

四、Spark 程序的shuffle调优

Shuffle阶段需要将数据写入磁盘,这其中涉及大量的读写文件操作和文件传输操作,因此对节点的系统IO有比较大的影响,因此可通过调整参数,减少shuffle阶段的文件数和IO读写次数来提高性能,具体参数主要有以下几个:
1)spark.shuffle.manager
设置Spark任务的shuffleManage模式,1.2以上版本的默认方式是sort,即shuffle write阶段会进行排序,每个executor上生成的文件会合并成两个文件(包含一个索引文件)。
2)spark.shuffle.sort.bypassMergeThreshold
设置启用bypass机制的阈值(默认为200),若Shuffle Read阶段的task数小于等于该值,则Shuffle Write阶段启用bypass机制。
3)spark.shuffle.file.buffer (默认32M)
设置Shuffle Write阶段写文件时buffer的大小,若内存比较充足的话,可以将其值调大一些(比如64M),这样能减少executor的IO读写次数。
4)spark.shuffle.io.maxRetries (默认3次)
设置Shuffle Read阶段fetches数据时的重试次数,若shuffle阶段的数据量很大,可以适当调大一些。

特别申明:部分内容来自或参考文档:http://blog.csdn.net/lw_ghy/article/details/51419760

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 的Shuffle过程详解 的相关文章

  • Scikit-learn、带有洗牌组的 GroupKFold?

    我正在使用 scikit learn 中的 StratifiedKFold 但现在我还需要观察 组 有一个很好的函数 GroupKFold 但我的数据非常依赖时间 与帮助中的相似 即周数是分组索引 但每周应该只折叠一次 假设我需要折叠 10
  • 猪的组连接等效吗?

    试图在 Pig 上完成这个任务 寻找 MySQL 的 group concat 等效项 例如 在我的表中 我有以下内容 3fields userid clickcount pagenumber 155 2 12 155 3 133 155
  • Windows 上的 Apache Pig 在运行“pig -x local”时出现“hadoop-config.cmd”未被识别为内部或外部命令”错误

    如果您由于以下错误而无法在 Windows 上运行 Apache Pig hadoop 2 4 0 bin hadoop config cmd is not recognized as an internal or external com
  • 更改 Spark Streaming 中的输出文件名

    我正在运行一个 Spark 作业 就逻辑而言 它的性能非常好 但是 当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时 输出文件的名称格式为 part 00000 part 00001 等 有没有办法更改输出文件名 谢谢
  • 处理 oozie 工作流程中的循环

    我有一个 oozie 用例 用于检查输入数据可用性并根据数据可用性触发 MapReduce 作业 所以我编写了一个 shell 脚本来检查输入数据 并在 oozie 中为其创建了一个 ssh 操作 输入数据检查的重试次数和重试间隔应该是可配
  • Hadoop 推测任务执行

    在Google的MapReduce论文中 他们有一个备份任务 我认为这与Hadoop中的推测任务是一样的 推测任务是如何实现的 当我启动一项推测任务时 该任务是从一开始就作为较旧且缓慢的任务开始 还是从较旧的任务到达的位置开始 如果是这样
  • 名称节点处于安全模式

    我提到了这些问题名称节点处于安全模式 无法离开 https stackoverflow com questions 15803266 name node is in safe mode not able to leave and SafeM
  • Spark 写入 hdfs 无法使用 saveAsNewAPIHadoopFile 方法

    我在 CDH 5 2 0 上使用 Spark 1 1 0 并试图确保我可以读取和写入 hdfs 我很快意识到 textFile 和 saveAsTextFile 调用旧的 api 并且似乎与我们的 hdfs 版本不兼容 def testHD
  • java.lang.ClassNotFoundException:找不到类 org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem

    我是 Spark 和 Kubernetes 世界的新手 我使用 docker image tool sh 实用程序使用与 Hadoop 3 2 捆绑在一起的官方 Spark 3 0 1 构建了 Spark docker 映像 我还为 Jup
  • 如何按行扩展数组值!!使用 Hive SQL

    我有一个有 4 列的表 其中一列 项目 类型是 ARRAY 其他是字符串 ID items name loc id1 item1 item2 item3 item4 item5 Mike CT id2 item3 item7 item4 i
  • 无法验证 serde:org.openx.data.jsonserde.jsonserde

    我编写了这个查询来在配置单元上创建一个表 我的数据最初是 json 格式 所以我已经下载并构建了 serde 并添加了它运行所需的所有 jar 但我收到以下错误 FAILED Execution Error return code 1 fr
  • 随机化数组元素

    我有一个数组 number 1 2 3 4 5 6 7 8 9 现在 我想随机化数组内容 例如 5 3 2 6 7 1 8 请指导我如何继续 Use the shuffle方法 irb main 001 0 gt 1 2 3 4 5 shu
  • 打乱列表并返回副本

    我想对数组进行洗牌 但我找到的只是类似的方法random shuffle x from 在 Python 中随机化字符串列表的最佳方法 https stackoverflow com questions 1022141 best way t
  • Hive:在查询中将 array 转换为 array

    我有两张桌子 create table a 1 array
  • 当我将文件存储在 HDFS 中时,它们会被复制吗?

    我是 Hadoop 新手 当我使用以下方式存储 Excel 文件时hadoop fs putcommoad 它存储在HDFS中 复制因子为3 我的问题是 是否需要3份并分别存储到3个节点中 这是 HDFS 工作的漫画 https docs
  • 将 Apache Zeppelin 连接到 Hive

    我尝试将我的 apache zeppelin 与我的 hive 元存储连接起来 我使用 zeppelin 0 7 3 所以没有 hive 解释器 只有 jdbc 我已将 hive site xml 复制到 zeppelin conf 文件夹
  • 公平调度器和容量调度器有什么区别?

    我是 Hadoop 世界的新手 想了解公平调度程序和容量调度程序之间的区别 另外我们什么时候应该使用每一个 请简单地回答一下 因为我在网上读了很多东西 但从中得到的不多 公平调度是一种为作业分配资源的方法 使得所有作业随着时间的推移平均获得
  • Python 包安装:pip 与 yum,还是两者一起安装?

    我刚刚开始管理 Hadoop 集群 我们使用 Bright Cluster Manager 直至操作系统级别 CentOS 7 1 然后使用 Ambari 以及适用于 Hadoop 的 Hortonworks HDP 2 3 我不断收到安装
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 如何对 RDD 进行分区

    我有一个文本文件 其中包含大量由空格分隔的随机浮动值 我正在将此文件加载到 scala 中的 RDD 中 这个RDD是如何分区的 另外 是否有任何方法可以生成自定义分区 以便所有分区都具有相同数量的元素以及每个分区的索引 val dRDD

随机推荐

  • Spring Security快速入门

    Spring Security是一个框架 提供 认证 authentication 授权 authorization 和 保护 以抵御常见的攻击 它对保护命令式和响应式应用程序有一流的支持 是保护基于Spring的应用程序的事实标准 spr
  • Java中的集合及深拷贝与浅拷贝

    Java中的集合及深拷贝与浅拷贝 Java是一种面向对象的编程语言 其中集合是常用的数据结构之一 具有方便快捷的特点 在Java开发中 我们常常需要对集合进行复制 拷贝 操作 但是 拷贝操作并不是简单的复制 而应该分为浅拷贝和深拷贝两种不同
  • MySQL学习笔记1:MySQL字符集和字符集编码

    MySQL学习笔记索引 MySQL学习笔记1 MySQL字符集和字符集编码 MySQL学习笔记2 如何避免数据库乱码 MySQL学习笔记3 排序规则和排序规则的影响 MySQL学习笔记4 排序规则的修改 文章目录 一 基本概念 二 mysq
  • 【学习笔记】应用与编排管理:Deployment

    学习笔记 应用与编排管理 Deployment 需求来源 背景问题 Deployment 管理部署发布的控制器 架构设计 管理模式 Deployment 控制器 ReplicaSet 控制器 发布模拟 spec 字段解析 升级策略字段解析
  • 闭包使用的3种情景

    定义 通俗讲 闭包是函数里面再定义一个函数 里层函数能访问到外层函数的局部变量 也就是说闭包是一个能访问外层函数局部变量的函数 常用情景有以下3种 1 在window下有个全局变量a 在函数checkScope内部有个局部变量a 需求 在w
  • Parkour World 游戏内测攻略详解

    Parkour World 是一款将赛博朋克风格与跑酷元素结合的运动类PC端游戏 玩家通过在虚拟世界中进行跑酷 从而获取相应的奖励 Parkour World结合区块链技术 采用保值NFT以及独特的3token Arb Hood Yuri
  • Spring源码分析(七)Bean生命周期源码解析5:Bean的销毁

    Bean的销毁过程 应用场景 通过实现DisposableBean接口 或者使用注解 PreDestroy都行 Component public class OrderService implements DisposableBean pu
  • 游戏开发笔记十三 游戏输入消息处理(二) 鼠标消息处理

    本系列文章由zhmxy555编写 转载请注明出处 http blog csdn net zhmxy555 article details 7405479 作者 毛星云 邮箱 happylifemxy qq com 欢迎邮件交流编程心得 上一
  • 领域驱动设计:DDD 关键概念

    文章目录 领域和子域 核心域 通用域和支撑域 通用语言 限界上下文 实体 值对象 聚合 聚合根 设计聚合 DDD 的知识体系提出了很多的名词 像 领域 子域 核心域 通用域 支撑域 限界上下文 聚合 聚合根 实体 值对象等等 非常多 领域和
  • 归一化笔记

    目录 为什么要归一化 归一化的方法有哪些 归一化各方法特点 归一化的意义 哪些机器学习算法需要做归一化 哪些机器学习算法不需要做归一化 为什么要归一化 因为每一列数据的量纲不同 导致数据分布区间区间存在差异 举例 人的身高可以是180cm
  • 搜狐2012年校园招聘会笔试题解析

    一 不定项选择题 1 以下程序的打印结果是 cpp view plain copy include
  • QT笔记- 使窗口不获得焦点,但响应鼠标事件

    HWND wid HWND this gt winId SetWindowLong wid GWL EXSTYLE GetWindowLong wid GWL EXSTYLE WS EX NOACTIVATE WS EX COMPOSITE
  • Polycarp and Div 3【Codeforces Round #496 (Div. 3)【D题】】【贪心】

    应该说是今天凌晨的吧 第一次打Code Forces 懵懵懂懂的 不过感觉还是良好 做了几道签到题 难题还是没有那个水准去做 Polycarp likes numbers that are divisible by 3 He has a h
  • 应用统计学与R语言实现笔记(番外篇四)——bookdown使用与OR值计算

    本期是之前做的应用统计学与R语言实现笔记的番外篇四 本期主要关注两个问题 一个是重新利用R的bookdown包创建新的电子书 另一个是计算公共卫生当中一个比较常见的指标OR值 文章目录 1 bookdown使用 2 公式更正 3 OR值计算
  • linux下网站压力测试工具webbench

    webbench最多可以模拟3万个并发连接去测试网站的负载能力 个人感觉要比Apache自带的ab压力测试工具好 安装使用也特别方便 1 适用系统 Linux 2 编译安装 引用wget http blog s135 com soft li
  • Sentinel 入门使用

    目录 一 Sentinel简介 1 1Sentinel简介 1 2 Sentinel与Hystrix的区别 1 3 名词解释 二 sentinel控制台 2 1 下载启动控制台 2 3 客户端接入控制台 2 4 Rest整合Sentinel
  • python基本概念-关键要素

    1 要素1 数据类型 Python提供了几种内置的数据类型 现在我们只关注其中两种 Python使用int类型表示整数 正整数或负整数 使用str类型表示字符串 Unicode字符序列 如果需要将一个数据项从某种类型转换为另一种类型 可以使
  • DB2数据库连接(jdbc连接)encoding not supported

    在进行db2数据库连接过程中发现了一些问题 报如下错误 com ibm db2 jcc b DisconnectException encoding not supported 该问题的出现是IBM JDK和sun JDK之间相互不支持 解
  • 【滤波器】7. 带通滤波器

    将低通滤波器和高通滤波器串联 如下图所示 就可得到带通滤波器 设低通滤波器的截止频率为 f p 1 f p1 fp1 高通滤波器的截止频率为
  • Spark 的Shuffle过程详解

    一 Shuffle的作用是什么 Shuffle的中文解释为 洗牌操作 可以理解成将集群中所有节点上的数据进行重新整合分类的过程 其思想来源于hadoop的mapReduce Shuffle是连接map阶段和reduce阶段的桥梁 由于分布式