Spark优化篇:数据倾斜解决

2023-05-16

数据倾斜是指我们在并行进行数据处理的时候,由于数据散列引起Spark的单个Partition的分布不均,导致大量的数据集中分布到一台或者几台计算节点上,导致处理速度远低于平均计算速度,从而拖延导致整个计算过程过慢,影响整个计算性能。

数据倾斜带来的问题

单个或者多个Task长尾执行,拖延整个任务运行时间,导致整体耗时过大。单个Task处理数据过多,很容易导致OOM。

数据倾斜的产生原因

数据倾斜一般是发生在 shuffle 类的算子、SQL函数导致,具体如以下:

类型RDDSQL
去重distinctdistinct
聚合groupByKey、reduceByKey、aggregateByKeygroup by
关联join、left join、right joinjoin、left join、right join

​​​​​​​

通过Spark web ui event timeline观察明显长尾任务:

数据倾斜大Key定位

RDD进行抽取:

val cscTopKey: Array[(Int, Row)] = sampleSKew(sparkSession,"default.tab_spark","id")
println(cscTopKey.mkString("\n"))

  def sampleSKew( sparkSession: SparkSession, tableName: String, keyColumn: String ): Array[(Int, Row)] = {
    val df: DataFrame = sparkSession.sql("select " + keyColumn + " from " + tableName)
    val top10Key: Array[(Int, Row)] = df
      .select(keyColumn).sample(withReplacement = false, 0.1).rdd
      .map(k => (k, 1)).reduceByKey(_ + _)
      .map(k => (k._2, k._1)).sortByKey(ascending = false)
      .take(10)
    top10Key
  } 

SQL进行抽取:

SELECT
	id,conut(1) as cn
FROM
	default.tab_spark_test_3
GROUP BY id	
ORDER BY cn DESC
LIMIT 100;
100000,2000012
100001,1600012
100002,1

单表数据倾斜优化

为了减少 shuffle 数据量以及 reduce 端的压力,通常 Spark SQL 在 map 端会做一个partial aggregate(通常叫做预聚合或者偏聚合),即在 shuffle 前将同一分区内所属同 key 的记录先进行一个预结算,再将结果进行 shuffle,发送到 reduce 端做一个汇总,类似 MR 的提前Combiner,所以执行计划中 HashAggregate 通常成对出现。 但是这种也会出现问题,如果key重复的量级特别大,Combiner也是解决不了本质问题。

解决方案:

Add Salt局部聚合 2、Remove Salt全局聚合

sparkSession.udf.register("random_prefix", ( value: Int, num: Int ) => randomPrefixUDF(value, num))
sparkSession.udf.register("remove_random_prefix", ( value: String ) => removeRandomPrefixUDF(value))

		//t1 增加前缀,t2按照加盐的key进行聚,t3去除加盐,聚合
    val sql =
      """
        |select
        |  id,
        |  sum(sell) totalSell
        |from
        |  (
        |    select
        |      remove_random_prefix(random_id) id,
        |      sell
        |    from
        |      (
        |        select
        |          random_id,
        |          sum(pic) sell
        |        from
        |          (
        |            select
        |              random_prefix(id, 6) random_id,
        |              pic
        |            from
        |              default.tab_spark_test_3
        |          ) t1
        |        group by random_id
        |      ) t2
        |  ) t3
        |group by
        |   id
      """.stripMargin
      
def randomPrefixUDF( value: Int, num: Int ): String = {
    new Random().nextInt(num).toString + "_" + value
  }

def removeRandomPrefixUDF( value: String ): String = {
    value.toString.split("_")(1)
  }  

表关联数据倾斜优化

1、适用场景

适用于 join 时出现数据倾斜。

2、解决逻辑

1、将存在倾斜的表,根据抽样结果,拆分为倾斜 key(skew 表)和没有倾斜 key(common)的两个数据集;

2、将 skew 表的 key 全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old 表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍,得到 new 表)。

3、打散的 skew 表 join 扩容的 new 表

union common 表 join old 表

以下为打散大 key 和扩容小表的实现思路

1、打散大表:实际就是数据一进一出进行处理,对大 key 前拼上随机前缀实现打散;

2、扩容小表:实际就是将 DataFrame 中每一条数据,转成一个集合,并往这个集合里循环添加 10 条数据,最后使用 flatmap 压平此集合,达到扩容的效果。

 /**
   * 打散大表  扩容小表 解决数据倾斜
   *
   * @param sparkSession
   */
  def scatterBigAndExpansionSmall(sparkSession: SparkSession): Unit = {
    import sparkSession.implicits._
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
      .withColumnRenamed("discount", "pay_discount")
      .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
      .withColumnRenamed("discount", "cart_discount")
      .withColumnRenamed("createtime", "cart_createtime")

    // TODO 1、拆分 倾斜的key
    val commonCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") != 101 && item.getAs[Long]("courseid") != 103)
    val skewCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") == 101 || item.getAs[Long]("courseid") == 103)

    //TODO 2、将倾斜的key打散  打散36份
    val newCourseShoppingCart = skewCourseShoppingCart.mapPartitions((partitions: Iterator[Row]) => {
      partitions.map(item => {
        val courseid = item.getAs[Long]("courseid")
        val randInt = Random.nextInt(36)
        CourseShoppingCart(courseid, item.getAs[String]("orderid"),
          item.getAs[String]("coursename"), item.getAs[String]("cart_discount"),
          item.getAs[String]("sellmoney"), item.getAs[String]("cart_createtime"),
          item.getAs[String]("dt"), item.getAs[String]("dn"), randInt + "_" + courseid)
      })
    })
    //TODO 3、小表进行扩容 扩大36倍
    val newSaleCourse = saleCourse.flatMap(item => {
      val list = new ArrayBuffer[SaleCourse]()
      val courseid = item.getAs[Long]("courseid")
      val coursename = item.getAs[String]("coursename")
      val status = item.getAs[String]("status")
      val pointlistid = item.getAs[Long]("pointlistid")
      val majorid = item.getAs[Long]("majorid")
      val chapterid = item.getAs[Long]("chapterid")
      val chaptername = item.getAs[String]("chaptername")
      val edusubjectid = item.getAs[Long]("edusubjectid")
      val edusubjectname = item.getAs[String]("edusubjectname")
      val teacherid = item.getAs[Long]("teacherid")
      val teachername = item.getAs[String]("teachername")
      val coursemanager = item.getAs[String]("coursemanager")
      val money = item.getAs[String]("money")
      val dt = item.getAs[String]("dt")
      val dn = item.getAs[String]("dn")
      for (i <- 0 until 36) {
        list.append(SaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,
          edusubjectname, teacherid, teachername, coursemanager, money, dt, dn, i + "_" + courseid))
      }
      list
    })

    // TODO 4、倾斜的大key 与  扩容后的表 进行join
    val df1: DataFrame = newSaleCourse
      .join(newCourseShoppingCart.drop("courseid").drop("coursename"), Seq("rand_courseid", "dt", "dn"), "right")
      .join(coursePay, Seq("orderid", "dt", "dn"), "left")
      .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
        , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
        "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")


    // TODO 5、没有倾斜大key的部分 与 原来的表 进行join
    val df2: DataFrame = saleCourse
      .join(commonCourseShoppingCart.drop("coursename"), Seq("courseid", "dt", "dn"), "right")
      .join(coursePay, Seq("orderid", "dt", "dn"), "left")
      .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
        , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
        "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")

    // TODO 6、将 倾斜key join后的结果 与 普通key join后的结果,uinon起来
    df1
      .union(df2)
      .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail")
  }

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark优化篇:数据倾斜解决 的相关文章

  • spark性能优化调优指导性文件

    1 让我们看一下前面的核心参数设置 num executors 10 20 executor cores 1 2 executor memory 10 20 driver memory 20 spark default parallelis
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    一 案例说明 现有一电商网站数据文件 名为buyer favorite1 记录了用户对商品的收藏数据 数据以 t 键分割 数据内容及数据格式如下 二 前置准备工作 项目环境说明 Linux Ubuntu 16 04 jdk 7u75 lin
  • 【pyspark】DataFrame基础操作(二)

    介绍一下 pyspark 的 DataFrame 基础操作 一 选择和访问数据 PySpark DataFrame 是惰性计算的 简单地选择一列不会触发计算 但它会返回一个 Column 实例 并且 大多数按列操作都返回 Column 实例
  • pyspark 连接远程hive集群配置

    今天本地spark连接远程hive集群 直接把配置导入进去 本地直接应用远程环境 1 安装spark 设置spark环境变量 2 拿到远程集群配置文件 将配置文件放在spark conf 目录下 xml 一共五个文件 3 将mysql co
  • spark dataframe 数据类型转换

    文章目录 1 spark sql数据类型 数字类型 日期类型 复杂类型 2 spark sql和scala数据类型对比 3 spark sql数据类型转换示例 代码 输出 1 spark sql数据类型 数字类型 ByteType 代表一个
  • Compressed Sparse Column format(CSC)

    CSR Compressed Sparse Row format 和CSC Compressed Spare Column format 都是一种稀疏矩阵的存储格式 这里分别给出实例 假设有如下矩阵 1360
  • SparkSQL HiveSQL 常用正则表达式

    SparkSQL HiveSQL 常用正则表达式 目录 SparkSQL HiveSQL 常用正则表达式 1 匹配汉字 2 匹配手机号码 3 匹配身份证 4 SparkSQL HiveSQL 常用正则函数 5 SparkSQL 分组 聚合
  • 【Spark系列2】reduceByKey和groupByKey区别与用法

    在spark中 我们知道一切的操作都是基于RDD的 在使用中 RDD有一种非常特殊也是非常实用的format pair RDD 即RDD的每一行是 key value 的格式 这种格式很像Python的字典类型 便于针对key进行一些处理
  • 【Spark NLP】第 7 章:分类和回归

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 记一次Spark打包错误:object java.lang.Object in compiler mirror

    使用maven compile和package 一直报错scala reflect internal MissingRequirementError object scala runtime in compiler mirror not f
  • spark-submit 报错 Initial job has not accepted any resources

    spark submit 报这样的错误 WARN scheduler TaskSchedulerImpl Initial job has not accepted any resources check your cluster UI to
  • Impala presto hbase hive sparksql

    Impala 技术点梳理 http www cnblogs com TiestoRay p 10243365 html Impala 优点 实时性查询 计算的中间结果不写入磁盘 缺点 对于内存的依赖过于严重 内存溢出直接导致技术任务的失败
  • spark_hadoop集群搭建自动化脚本

    bin bash 脚本使用说明 1 使用脚本前需要弄好服务器的基础环境 2 在hadoop的每个节点需要手动创建如下目录 data hdfs tmp 3 修改下面的配置参数 4 脚本执行完备后需要收到格式化namenode
  • 2020-10-24 大数据面试问题

    上周面试数据开发职位主要从公司的视角讲一下记录下面试流水 1 三面技术一轮hr 面到了cto 整体来看是这一周技术含量最高信息量最大的一个 1到4轮过了4个小时 技术上的问题主要问的对数据分层的理解 1 一面自我介绍 目前团队的规模多大 2
  • 大数据手册(Spark)--Spark基本概念

    文章目录 Spark 基本概念 Hadoop 生态 Spark 生态 Spark 基本架构 Spark运行基本流程 弹性分布式数据集 RDD Spark安装配置 Spark基本概念 Spark基础知识 PySpark版 Spark机器学习
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • sparkstreamming 消费kafka(1)

    pom
  • Spark Sql之dropDuplicates去重

    文章目录 算子介绍 示例 问题 解决 dropDuplicates和distinct 参考 算子介绍 dropDuplicates去重原则 按数据行的顺序保留每行数据出现的第一条 dropDuplicates 在Spark源码里面提供了以下
  • Spark 配置

    文章目录 1 Spark 配置 1 1 Spark 属性 1 1 1 动态加载Spark属性 1 1 2 查看Spark属性 1 2 环境变量 2 重新指定配置文件目录 3 继承Hadoop集群配置 4 定制的Hadoop Hive配置 1
  • Spark 中 BroadCast 导致的内存溢出(SparkFatalException)

    背景 本文基于 Spark 3 1 1 open jdk 1 8 0 352 目前在排查 Spark 任务的时候 遇到了一个很奇怪的问题 在此记录一下 现象描述 一个 Spark Application Driver端的内存为 5GB 一直

随机推荐

  • Android Gradle 7.x新版本的依赖结构变化

    版本的小蜜蜂 小海豚 电鳗版本的Android Studio新建工程的依赖结构和之前的发生了变化 xff0c 主要有 xff1a 原来在工程build gradle中的buildscript和allprojects xff0c 移动至set
  • C#:如何用VS开启人生中第一个Windows窗体应用程序(Winform)?

    摘要 xff1a Windows窗体应用程序 xff08 Winform xff0c 下文以此指代 xff09 既能有效 直观地设计Windows窗体界面 xff0c 又支持内部逻辑的编写 那么 xff0c 对于C 初学者来说 xff0c
  • BootLoader & Grub详解

    BootLoader amp Grub详解 xff08 补记 xff09 2008 8 2 星期日 凉爽 补记 xff1a 2010 xff0d 04 xff0d 21 时隔两年 xff0c 会过头来重新看了一下 xff0c 发现GRUB的
  • 签名问题:EXPKEYSIG F42ED6FBAB17C654 Open Robotics <info@osrfoundation.org>

    sudo apt key adv keyserver keyserver ubuntu com recv keys F42ED6FBAB17C654 代码如上 xff0c 更换签名
  • Python,gnuplot,libsvm配置详细步骤

    1 下载Python xff0c gnuplot以及libsvm 我的电脑是64位 xff0c Win7操作系统 1 1 python 2 7 6 64位 这里我用的Python是64位的Python2 7 6 下载地址 xff1a htt
  • C++中assert函数的用法介绍

    assert宏的原型定义在 lt assert h gt 中 xff0c 其作用是如果它的条件返回错误 xff0c 则终止程序执行 xff0c 原型定义 xff1a inclide lt assert h gt void assert in
  • C++中stdlib.h头文件介绍

    stdlib头文件即standard library标准库头文件 xff0c stdlib头文件里包含了C C 43 43 语言的最常用的系统函数 xff0c 该文件包含了C语言标准库函数的定义 xff0c stdlib h中定义了物种类型
  • 蛋白质性质和结构分析

    原文链接 第七章 蛋白质性质和结构分析 传统的生物学认为 xff0c 蛋白质的序列决定了它的三维结构 xff0c 也就决定了它的功能 由于用X光晶体衍射和NMR核磁共振技术测定蛋白质的三维结构 xff0c 以及用生化方法研究蛋白质的功能效率
  • Libsvm网格参数寻优教程

    原文 xff1a http endual iteye com blog 1262010 首先下载Libsvm Python和Gnuplot xff1a l libsvm的主页http www csie ntu edu tw cjlin li
  • 打井问题

    在偏远的山区 xff0c 水资源很稀缺 xff0c 因此 xff0c 我们问每个山区进行打井工程 xff0c 在不同的地方打了N口井 xff0c 现在我们要在这N口井之间修建管道 xff0c 要使得这些井都能连通 xff0c 同时所使用的管
  • C语言结构体的初始化

    C primer Plus第五版 第14章结构和其他数据形式 1 结构声明 结构声明 xff08 structure declaration xff09 是描述结构体如何组合的主要方法 xff0c 声明就像下面这样 xff1a struct
  • 【Unix编程】文件处理函数

    文件处理函数 xff1a http www iteedu com os linux linuxprgm linuxcfunctions file fcntl php 1 close xff08 关闭文件 xff09 相关函数 open xf
  • ubuntu安装vnc踩的坑

    较新版本的ubuntu 安装vnc 1 搜索setting 把里面的sharing的权限都打开 2 试一下sudo apt get install vnc4server 或者sudo apt y install vnc4server 3 如
  • ElasticSearch 7.6中遇到的一些坑

    一 限制单个index在单个节点上的总shard数 index routing allocation total shards per node 一般在冷热分离的场景种 xff0c 冷数据会设置副本 xff0c 热数据为了保证写入速度 xf
  • 大数据部门组织结构

    平台团队 运维团队 运维工程师最基本的职责都是负责服务的稳定性 xff0c 确保服务可以7 24H不间断地为用户提供服务 xff0c 负责维护并确保整个服务的高可用性 xff0c 同时不断优化系统架构提升部署效率 优化资源利用率 xff1b
  • Hadoop HDFS 副本机制

    Data Replication HDFS is designed to reliably store very large files across machines in a large cluster It stores each f
  • Apache Spark 3.0:全新功能知多少

    Spark3 0解决了超过3400个JIRAs xff0c 历时一年多 xff0c 是整个社区集体智慧的成果 Spark SQL和 Spark Cores是其中的核心模块 xff0c 其余模块如PySpark等模块均是建立在两者之上 Spa
  • Spark优化篇:动态内存管理

    Spark内存管理分为静态内存管理和统一内存管理 xff0c Spark1 6之前使用的是静态内存管理 xff0c Spark1 6之后的版本默认使用的是统一内存管理 动态内存机制图 xff1a 内存估算 xff1a Other Memor
  • Spark优化篇:RBO/CBO

    在Spark1 0中所有的Catalyst Optimizer都是基于规则 rule 优化的 为了产生比较好的查询规 则 xff0c 优化器需要理解数据的特性 xff0c 于是在Spark2 0中引入了基于代价的优化器 xff08 cost
  • Spark优化篇:数据倾斜解决

    数据倾斜是指我们在并行进行数据处理的时候 xff0c 由于数据散列引起Spark的单个Partition的分布不均 xff0c 导致大量的数据集中分布到一台或者几台计算节点上 xff0c 导致处理速度远低于平均计算速度 xff0c 从而拖延