spark的转换算子2

2023-05-16

1)coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

作用
缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

val ListRDD: RDD[Int] = context.makeRDD(Range(1, 15, 2), 4)
println("缩减分区前: ")
ListRDD.glom.collect.foreach(data => println(data.mkString(",")))
val coalesceRDD: RDD[Int] = ListRDD.coalesce(3)
println("缩减分区后: ")
coalesceRDD.glom.collect.foreach(data => println(data.mkString(",")))

输出:
缩减分区前:
1
3,5
7,9
11,13
缩减分区后:
1
3,5
7,9,11,13

将coalesce参数设置为2:
缩减分区前:
1
3,5
7,9
11,13
缩减分区后:
1,3,5
7,9,11,13

coalesce进行收缩合并分区,减少分区的个数,并没有shuffle操作,但这块也有隐患,数据倾斜是一个问题.

  1. repartition
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
    } 不同于coalesce算子的是,repartition算子必须进行shuffle操作

    将上述coalesce改成repartition算子
    缩减分区前:
    1
    3,5
    7,9
    11,13
    缩减分区后:
    5,13
    1,7
    3,9,11
    参数3改成2

    缩减分区前:
    1
    3,5
    7,9
    11,13
    缩减分区后:
    1,5,7,11
    3,9,13

    比之coalesce算子,shuffle操作导致效率更低了,但是数据倾斜好点,开发中避免shuffle操作更好,提升效率,数据倾斜有其他的处理方式
  2. union
    1.作用: 对源RDD和参数RDD求并集后返回一个新的RDD
    2.需求: 创建两个RDD,求并集
val ListRDD: RDD[Int] = context.makeRDD(1 to 10, 4)
    val ListRDD2: RDD[Int] = context.makeRDD(5 to 12)
    val unionRDD: RDD[Int] = ListRDD.union(ListRDD2)
    unionRDD.collect.foreach(data => print(data + " "))

输出:1 2 3 4 5 6 7 8 9 10 5 6 7

  1. subtract
    1.作用: 计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
    2.需求: 创建两个RDD,求第一个RDD与第二个RDD的差集
    将上述函数union改成subtract

    输出:4 8 1 9 2 10 3

5).cartesian
1.作用: 笛卡尔积(尽量避免使用)
2.创建两个RDD,计算两个RDD的笛卡尔积

val ListRDD: RDD[Int] = context.makeRDD(1 to 3, 4)
    val ListRDD2: RDD[Int] = context.makeRDD(5 to 7)
    val cartesianRDD: RDD[(Int, Int)] = ListRDD.cartesian(ListRDD2)
    cartesianRDD.collect.foreach(data => print(data + " "))

输出:(1,5) (1,6) (1,7) (2,5) (2,6) (2,7) (3,5) (3,6) (3,7)
6) zip
1.作用将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常.
2.需求:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)的RDD

val ListRDD: RDD[Int] = context.makeRDD(1 to 3,2)
    val ListRDD2: RDD[Int] = context.makeRDD(5 to 7,2)
    val cartesianRDD: RDD[(Int, Int)] = ListRDD.zip(ListRDD2)
    cartesianRDD.collect.foreach(data => print(data + " "))

7)partitionBy案例
1.作用:对partitionRDD进行分区操作,如果原有的partitionRDD和现有的partition是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程.
2.需求:创建一个4个分区的RDD,对其重新分区

val ListRDD: RDD[(Int, String)] = context.makeRDD(Array((1, "asp"), (2, "scala"), (3, "spark"), (4, "Python")), 4)
    val HashRDD: RDD[(Int, String)] = ListRDD.partitionBy(new org.apache.spark.HashPartitioner(2))
    HashRDD.mapPartitionsWithIndex {
      case (nums,datas) => datas.map((_," 分区:"+nums))
    }.collect.foreach(println)

输出:
((2,scala), 分区:0)
((4,Python), 分区:0)
((1,asp), 分区:1)
((3,spark), 分区:1)
底层实现:

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  def numPartitions: Int = partitions
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

如果获取到的key是null,则被分到0分区
其他key调用nonNegativeMod函数(x:Int,mod:Int) x指的是当前key的hashCode值,用x 取模 分区数,这个余数就是新的分区号

利用这一点我们可以自定义分区器
8)自定义分区器
val ListRDD: RDD[(String, Any)] = context.makeRDD(Array((“Python”, “人生苦短”), (“Scala”, 3), (“Spark”, “内存计算”), (“Hadoop”, “大数据存储”), ("", “没有Key”)), 4)

    val MypartitionRDD: RDD[(String, Any)] = ListRDD.partitionBy(new Mypartition(3))
    MypartitionRDD.mapPartitionsWithIndex {
      case (nums, datas) => datas.map((_, " 分区:" + nums))
    }.collect.foreach(println)
  }
}
class Mypartition(partition: Int) extends Partitioner {
  override def numPartitions: Int = {
    partition
  }
  override def getPartition(key: Any): Int = key match {
    case key if key.toString.contains("a") => 0   //key中包含字符a 的分到0分区
    case key if ("").equals(key.toString) => 1    //key中是null 的分到1分区
    case _ => 2         //其余分到2分区
  }
}

输出:
((Scala,3), 分区:0)
((Spark,内存计算), 分区:0)
((Hadoop,大数据存储), 分区:0)
((,没有Key), 分区:1)
((Python,人生苦短), 分区:2)
常用代码以省略。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spark的转换算子2 的相关文章

  • 数据倾斜

    数据倾斜发生时的现象 1 绝大多数task执行得都非常快 但个别task执行的极慢 2 原本能正常执行的Spark作业 某天突然爆出OOM 内存溢出 异常 观察异常栈 是我们写的业务代码造成的 数据倾斜发生的原理 在进行shuffle的时候
  • Spark集群安装部署

    目录 一 环境准备 二 安装步骤 三 使用Standalone模式 四 使用Yarn模式 一 环境准备 由于Spark仅仅是一种计算机框架 不负责数据的存储和管理 因此 通常都会将Spark和Hadoop进行统一部署 由Hadoop中的HD
  • 任务长期不释放和占用单节点持续的cpu,导致hivesever2本身内存泄漏造成

    任务长期不释放和占用单节点持续的cpu 导致hivesever2本身内存泄漏造成 产生的原因在于 查询过于复杂或者数据量过大 当有复杂的查询或处理大量数据的请求时 HiveServer2可能会出现高负载 这可能涉及大量的计算 IO操作或涉及
  • spark创建maven工程创建scala目录并编译

    背景 我创建spark的maven工程的时候 在java目录同级还创建了一个scala目录 这就得考虑编译相关的事了 解决 1 创建source folder 如下图所示 直接创建就好了 2 编译带来的问题 编译的时候发现一个问题 就是在s
  • Spark基础知识(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 一 Spark概述 Spark模块 Core SQL Streami
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    一 案例说明 现有一电商网站数据文件 名为buyer favorite1 记录了用户对商品的收藏数据 数据以 t 键分割 数据内容及数据格式如下 二 前置准备工作 项目环境说明 Linux Ubuntu 16 04 jdk 7u75 lin
  • 大数据--pyspark远程连接hive

    上一篇文章介绍了python连接hive的过程 通过地址 端口号访问到hive并对hive中的数据进行操作 这一篇文章介绍一下怎么通过windows本地pyspark 本地部署好的spark 远程虚拟机的hive 完成本地pyspark对h
  • spark-shell 加载本地文件报错 java.io.FileNotFoundException

    学习spark shell 时候发现一个问题 从本地文件加载数据生成RDD 报错 文件找不到 原因 spark shell 如果启动了集群模式 真正负责计算的executor会在 该executor所在的 worker节点上读取文件 并不是
  • scala和spark的下载与安装

    简易安装scala和spark 一 安装scala 1 安装scala scala下载注意和jdk的版本号 下载地址 https www scala lang org download 2 上传到linux虚拟机里 可通过rz方式上传 上传
  • Spark SQL 之 Temporary View

    Spark SQL 之 Temporary View spark SQL的 temporary view 是支持原生SQL 的方式之一 spark SQL的 DataFrame 和 DataSet 均可以通过注册 temporary vie
  • 基于Spark的电商用户行为实时分析可视化系统(Flask-SocketIO)

    基于Spark的电商用户行为实时分析可视化系统 Flask SocketIO 项目简介 该项目已上线蓝桥课程 有需要的可凭邀请码 UB5mdLbl 学习哦 有优惠 课程地址 https www lanqiao cn courses 2629
  • Spark 源码阅读一-启动脚本

    Spark Complile Help Links Because spark 1 5 need maven version 3 3 3 so i track the branch 1 4 git branch a git checkout
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca
  • 记一次Spark打包错误:object java.lang.Object in compiler mirror

    使用maven compile和package 一直报错scala reflect internal MissingRequirementError object scala runtime in compiler mirror not f
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • 学习笔记-Spark环境搭建与使用

    一 20 04 Ubuntu安装 清华源ISO源 https mirrors tuna tsinghua edu cn ubuntu releases 20 04 下载链接 https mirrors tuna tsinghua edu c
  • 数据中台-让数据用起来-6

    文章目录 第六章 数据开发 数据价值提炼工厂 6 1 数据计算能力的4种类型 6 1 1 批计算 6 1 2 流计算 6 1 3 在线查询 6 1 4 即席分析 6 2 离线开发 1 作业调度 2 基线控制 3 异构存储 4 代码校验 5
  • spark hadoop环境及运行

    hadoop配置 在Ubuntu20 04里安装Hadoop详细步骤 图文 亲测成功 ubuntu20 04安装hadoop 菜鸡的学习之路的博客 CSDN博客 启动hadoop root ubuntu usr local hadoop s
  • Spark Sql之dropDuplicates去重

    文章目录 算子介绍 示例 问题 解决 dropDuplicates和distinct 参考 算子介绍 dropDuplicates去重原则 按数据行的顺序保留每行数据出现的第一条 dropDuplicates 在Spark源码里面提供了以下
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可

随机推荐

  • 【无标题】es搜索基本操作

    一 xff0c 准备数据 1 创建索引 PUT lagou book 2 创建mapping PUT lagou book doc mapping 34 properties 34 34 description 34 34 type 34
  • 【ES】常用操作工具

    工欲善其事 xff0c 必先利于器 xff0c es使用过程中 xff0c 有些工具能帮助我们快速的上手和使用 一 es head es head 是一款专门针对 es的客户端工具elasticSearch配置包 是一个基于node js的
  • 【es】基本概念理解

    一 xff0c 初识es 1 是什么 xff1f ElasticSearch 简称es 开源的分布式的全文搜索引擎 xff0c 可以近乎实时的存储检索数据 xff0c es使用java开发 xff0c 并且使用Lucene作为核心实现搜索功
  • 无法安装net framework 3.5 的解决方法

    电脑刚重装了Windows8 1系统 xff0c 然后安装数据库的时候 xff0c 却出现了这样的问题 xff1a 您的电脑上的应用需要使用以下windows功能 问题原因是 xff1a 在安装系统的时候 xff0c NET Framewo
  • 【计算机网络原理】第四章 数据链路层

    今天主要梳理了一下数据链路层的内容 xff0c 如下 一 宏观规划 综合数据链路层的整体 xff0c 分为两大部分 xff0c 第一部分讲解数据链路层的功能 xff0c 第二部分讲解数据链路层的功能 这些协议 xff0c 其实还是为了实现数
  • 【redis】关系型数据库 VS 非关系型数据库

    一 关系型数据库 xff1f 1 概念 关系型数据库是指采用了关系模型来组织数据的数据库 简单来说 xff0c 关系模式就是二维表格模型 主要代表 xff1a SQL Server xff0c Oracle Mysql PostgreSQL
  • resultful风格接口

    一 产生背景 网络应用程序 xff0c 越来越流行前端和后端的分离设计 当前的发展趋势是前端的设计层出不穷 比如 xff1a 各种型号的手机 平板灯其他设计 因为必须要一种统一的机制方便不同的前端和后端进行通信 这就导致了API结构的流行
  • 【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic

    今天碰到一个奇怪的问题 xff0c 如下图 xff1a 一 问题 1 问题截图 上午还可以发送消息成功的 xff0c 下午突然就发送不了消息了 我就检查我代码的问题 xff0c 是传递的格式不对 xff0c 还是数据要求不对 网上的资料显示
  • 【0723】自动化运维——saltstack

    24 1 自动化运维介绍 认识自动化运维 xff1a 传统运维效率低 xff0c 大多工作人为完成传统运维工作繁琐 xff0c 容易出错传统运维每日重复做相同的事情传统运维没有标准化流程传统运维的脚本繁多 xff0c 不能方便管理自动化运维
  • 【mysql】order by多个字段排序

    今天遇到了两个字段排序的问题 xff0c 感觉不是很清晰 xff0c 所有又按照规则查询了下 xff0c 总结下 count都是306的有三个 现在需要同时按照age和count排序 xff0c 测试最后的排序结果 默认都是按照age和co
  • java8使用积累

    1 将List lt T gt 数组转换为String并用逗号隔开 String join 34 34 List 2 idea自动补全代码教程 xff1a https www cnblogs com HF Made p 11417225 h
  • 【java】手动分页工具类

    最近小编遇到一个很绕的问题 xff0c 无法使用mybatis自带的分页插件对符合条件的数据进行分页 xff0c 故收集了一个自动分页的工具类 xff1a public static lt T gt List lt T gt getPage
  • redis 使用bitMap实现统计系统在线用户数量

    BitMap xff0c 简单来说 xff0c 其实也就是 byte 数组 xff0c 用二进制表示 xff0c 一个bit的值 xff0c 或者是0 xff0c 或者是1 xff1b 也就是说一个bit能存储的最多信息是2 它用一个bit
  • xml与实体之间的转换

    在对接一些第三方接口的时候往往需要涉及到一些对xml文件的处理 xff0c 小编今天主要总结一下JavaBean与xml文件之间互相转换的探索与实例 使用JAXB技术实现xml与实体之间的转换 1 是什么 xff1a JAXB xff08
  • SVN中trunk、branch、tag区别

    虽然一直都在用svn作项目管理 xff0c 但是一直都是傻傻的分不清主干 分支 今天小编就来详细介绍一下 xff1a branch 分支 xff1a 版本控制系统的一个特性就是能够把各种修改分离出来放在开发品的一个分割线上 这条线就被称为分
  • win10下如何解决VC++MSDEV.EXE的0xc0000142错误

    最近小编在学习时用到了vC 43 43 这个软件 xff0c 但是一直无法正常启动 小编的电脑是从win10家庭版升级到了win10企业版的 xff0c 之前并没有出现过什么异样 xff0c 直到最近软件无法正常打开 xff0c 从网上搜得
  • windows下获取文件上传路径报错:java.io.FileNotFoundException: (文件名、目录名或卷标语法不正确。)

    小编今天在java编程中使用fastdfs实现文件上传功能的时候 xff0c 获取windows文件路径时报错 xff1a java io FileNotFoundException 文件名 目录名或卷标语法不正确 原因分析 xff1a 出
  • qrcode(一)

    1 二维码概念 二维码又称QR Code xff0c QR全称Quick Response xff0c 是一个近几年来移动设备上超流行的一种编码方式 是用某种特定的几何图形按一定规律在平面 xff08 二维方向上 xff09 分布的黑白相间
  • zk服务启动报错:Unexpected exception exiting abnormally java.io.eofexception

    解决方案 xff1a 找到zoo conf中配置的dataDir和dataLogDir路径 然后删除两个文件夹下的version 2文件夹 重启服务 xff0c 问题解决
  • spark的转换算子2

    1 xff09 coalesce def coalesce numPartitions Int shuffle Boolean 61 false implicit ord Ordering T 61 null RDD T 该函数用于将RDD