Spark 【分区与并行度】

2023-11-19

RDD 并行度和分区

SparkConf

setMaster("local[*]")

我们在创建 SparkContext 对象时通常会指定 SparkConf 参数,它包含了我们运行时的配置信息。如果我们的 setMaster 中的参数是 "local[*]" 时,通常代表使用的CPU核数为当前环境的最大值。

val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    rdd.saveAsTextFile("test_par_out")

    sc.stop()

运行结果:

在设备管理器中查看CPU核数:

 setMaster("local")

这时的使用 CPU 核数的默认值为 1 。

val conf = new SparkConf()
      .setMaster("local")
      .setAppName("test partition")

 

setMaster["local[2]"]

设置使用的 CPU 核数为 2

val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("test partition")

创建RDD时指定分区数

我们也可以在创建 RDD 对象时指定切片数 numSlices(切片数就是分区的数量(通常一个分区对应一个Task,一个Task对应一个Excutor(一个CPU核心)))。

   val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    val sc = new SparkContext(conf)

//第二个参数用来指定并行度(分区数)
    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),1)
    rdd.saveAsTextFile("test_par_out")

    sc.stop()

 

conf.set 指定并行度

val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    conf.set("spark.default.parallelism","5")

读取内存数据(集合)的分区规则

核心源码:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }

比如我们读取集合数组 List(1,2,3,4,5),我们在创建RDD对象时设置分区数为 3 。

val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5),3)

 当我们保存时,会输出三个文件,文件内容分别是:

  • part-00000:1
  • part-00001:2,3
  • part-000002:4,5

因为此时我们源码的 positions 的参数是 (length:5,numSilces:3),它会返回三个元组(start,end),对应我们数组的下标,并且左闭右开。

  • part-00000:(0,1)
  • part-00001:(1,3)
  • part-000002:(3,5)

读取文件数据的分区规则

我们在通过读取本地文件系统的文件来创建 RDD 时:

val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    conf.set("spark.default.parallelism","5")

    val sc = new SparkContext(conf)

   val rdd = sc.textFile("data/1.txt")

    sc.stop()

默认的分区数量是最小分区数量(2):

//defaultParallelism取决于 setMaster("local[*]") ,如果是 local[*] 代表分区数=CPU核数 但是min方法返回最小值,最小值=2
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

Spark 分区规则和 Hadoop 是一样的,只是切片规则和数据读取规则有差异。

案例-文件a.txt:

1
2
3

Spark 分区数量的计算方式:

源码:

long totalSize = 0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

 对于我们上面的文件 a.txt:

// totalSize 是文件的总字节数,一个回车占两个字节
totalSize = 7
goalSize = 7 / (2 == 0 ? 1 : 2) = 3 (单位:byte)    //也就是每个分区占用3个字节

//分区数= totalSize/gogalSize=2...1 (余数1byte,根据hadoop的规则,如果余数>每个分区的字节数的1.1倍,就要产生新的分区,否则就不会产生新的分区)
//这里余数是 1 , 1/3 = 33.3% > 0.1 所以会产生一个新的分区
//所以分区数 = 3

数据分区的分配

1. Spark 数据分区以行为单位进行读取

2. 数据读取时,以偏移量为单位

以上面的 a.txt 为例(@@代表一个回车)

1@@    => 012            
2@@    => 345
3      => 6

 

3. 数据分区的偏移量范围的计算

//注意: 左右都是闭区间,
//偏移量不会被重复读取   
part-00000    => [0,3]    => 1@@,2@@    //读到3的时候已经到了第二行,要读就读一整行,所以2@@都会被读取
part-00001    => [3,6]    => 3 [3,6]对应的第二行的第1个字节(2)~第3行第1个字节(3),而2已经被读过了,所以只剩3
part-00002    => [6,7]    => 

coalesce 和 repartition

coalesce 和 repartition 分别用于缩减分区节省资源和扩大分区提高并行度。

coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。

当 Spark 程序中,存在过多的小任务的时候,可以使用 coalesce 方法,收缩合并分区,减少分区的个数,减少任务调度成本。

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

    val newRDD1 = rdd.coalesce(2)
    /*
      coalesce 默认情况下不会将分区内的数据打乱重新组合,这里是直接将三个分区中两个分区合并为一个分区,另外一个仍然是一个分区
      这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
      如果想要数据均衡,可以进行shuffle处理
      分区结果:
               part-00000: 1 2
               part-00001: 3 4 5 6
     */
    val newRDD2 = rdd.coalesce(2,true)
    /*
      分区结果:
        part-0000: 1 4 5
        part-0001: 2 3 6
     */

repartition

repartition 的底层其实就是 coalesce ,为了区分缩减和扩大分区(都可以由coalesce实现),所以分成了两个方法。

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    /*
      length=6,numSlices=2
      part-00000:  1 2 3
      part-00001:  4 5 6
     */
    // 想要扩大分区数量 提高并行度 shuffle 必须为true 因为我们要把2个分区的数据分为3个分区 就必须打乱分区内的数据重新排
    // 如果不设置shuffle为true是没有意义的 结果还是2个分区
    val newRdd1 = rdd.coalesce(3,true)
    /*
       分区结果:
          part-00000: 3 5
          part-00001: 2 4
          part-00002: 1 6
     */

    // 缩减分区用 coalesce,如果要数据均衡可以采用 shuffle
    // 扩大分区用 repartition , repartition底层就是 coalesce(numSlices,true)
    rdd.repartition(2)

repartition 底层代码

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 【分区与并行度】 的相关文章

随机推荐

  • 修改网页logo

    在用浏览器打开网站的时候 浏览器标签页上面有网站的图标 类似于logo小图标 如下图 步骤1 打开你的tomcat的安装目录 我的目录实在G盘 G apache tomcat 7 0 53 windows x64 apache tomcat
  • java进制转换方法

    一 十进制向二 八 十六进制的转换 方法一 Integer toBinaryString i 表示十进制转为二进制 Integer toOctalString i 表示十进制转为八进制 Integer toHexString i 表示十进制
  • 周庄不买门票攻略_周庄古镇旅游攻略

    周庄古镇旅游攻略 周庄古镇是世界文化遗产预选地 首批国家5A级旅游景区 位于苏州城东南 位于昆山 吴江 上海三地交界处 周庄古镇四面环水 因河成镇 依水成街 以街为市 井字型河道上完好保存着14座建于元 明 清各代的古石桥 800多户原住民
  • org/springframework/boot/maven/RepackageMojo has been compiled by a more recent version of the Java

    项目场景 项目中执行clean 再执行install时报错 错误如下 org springframework boot maven RepackageMojo has been compiled by a more recent versi
  • Python库之自然语言处理和文本挖掘

    来源地址 http www python88 com topic 37015 https mp weixin qq com s sPAomFg 5JZigFUG CtnaQ 自然语言处理和文本挖掘库主要用于以自然语言文本为对象的数据处理和建
  • linux基本命令练习

    1 列出 etc目录下的所有文件名称 2 创建文件file1 和file2 并复制到 home目录下 3 显示以ma开头的所有命令 ma 双击两次 TAB键 4显示所有文件名中有 bash的文件 用tab命令补全 5 显示当前所在的目录路径
  • android图像识别(百度普通物体识别)

    android图像识别 采用百度sdk 识别准确率基本上能用 主要缺陷是百度sdk免费额度有限 demo链接如下 仅供参考https download csdn net download android xc 12274161
  • Python进阶之CrawlSpider的应用及Scrapy配置项的引用

    1 CrawlSpider的应用 CrawlSpider可以根据规则自动分析链接的数据并按照正则的要求取出需要的数据 scrajpy startproject yg cd yg 注意 t crawl参数 scrapy genspider t
  • 解决SqlServer批量插入最多2100条数据的方法

    SqlServer批量插入数据时最多不能超过2100条 记录一下解决办法 Java代码 public void batchInsert List
  • 基于vue实现移动端点击上方导航,内容滑动切换,滑动内容导航自动切换。

    这是在日常开发过程中常见的选项卡 带滑动切换效果 小白一枚 不足之处还望体谅 包涵 这也是第一次尝试写博客 以后会继续分享一些工作中的问题与收获 实现效果 点击上方导航 当前导航添加样式 下方内容滑动切换 滑动下方内容上面导航切换 第一步
  • 论文笔记:Region Representation Learning via Mobility Flow

    2017 CIKM 1 摘要和介绍 使用出租车出行数据学习区域向量表征 同时考虑时间动态和多跳位置转换 gt 通过flow graph和spatial graph学习表征 出租车交通流可以作为区域相似度的一种 A区域和B区域之间流量大 gt
  • JS 变量提升和函数提升

    变量提升 这里介绍一个变量提升提升的经典案例 for var i 0 i lt 10 i setTimeout gt console log i 1000 这里打印是10个10 因为在执行第一个setTimeout时 Js不会等待1秒后再去
  • 怎么解决“无法打开包括文件:“graphics.h”: No such file or directory”的问题

    在接手同伴的中国象棋项目时 导入项目后 发现VS一直提示 无法打开包括文件 graphics h No such file or directory 在查阅资料后发现是缺少easyx文件 接下来 就介绍一下手动配置一下easyx文件 eas
  • 特殊类型动归--区间动归与环形动归

    区间动归 某类有序事件中前若干个事件的子答案 不能再支撑状态转移 我们需要去寻找 从某个元素起到另一个元素结束所包含所有的 连续 元素的子答案 作为状态 可以想象 在这个描述下 每个状态会对应于原题序列上的一个区间 区间具有良好的性质 短的
  • 深度学习(1):BP神经网络实现银行客户流失预测

    目的 针对银行客户行为和统计数据实现客户流失预测任务 一 数据准备 1 数据集 select data csv 作为训练样本 数据预处理方式 归一化 数值化 CreditScore 信用分数 EB 存贷款情况 EstimatedSalary
  • centos 建立回收站

    linux下的回收站在每一个当前用户目录 local share Trash中 也可以给linux添加一个回收站 mkdir tmp trash tmp 建立一个回收站目录 vi bin trash 编辑一个文件 mv tmp trash
  • python之浅拷贝、深拷贝

    什么是浅拷贝 深拷贝 理论来自python基础教程 在 Python 中 对象赋值实际上是对象的引用 当创建一个对象 然后把它赋给另一个变量的时候 Python 并没有拷贝这个对象 而只是拷贝了这个对象的引用 我们称之为浅拷贝 在 Pyth
  • 腾讯云 Finops Crane 开发者集训营 - 云原生如何助力企业搞定成本优化

    引言 随着docker的技术普及 越来越多的企业加入了云计算发展进程 云原生产业发展迅猛 云原生建设投入比例明显 面对大规模的集群投入 部署 维护等问题也逐渐产生 越来越多的企业对云原生不断提出更高要求 同时云原生技术简化运维的效能提 升开
  • .Net WebAPI 高速下载文件接口实现

    接触WebAPI一年多了 感觉是个承上启下 开创未来的技术 老一辈程序员写接口就像写方法一样 不需要了解太多网页的知识 却可以在浏览器中访问这些接口 由于是基于HTTP协议 因此不管是PC 手机还是嵌入式均可顺利访问 对于当下软件多终端的设
  • Spark 【分区与并行度】

    RDD 并行度和分区 SparkConf setMaster local 我们在创建 SparkContext 对象时通常会指定 SparkConf 参数 它包含了我们运行时的配置信息 如果我们的 setMaster 中的参数是 local