大数据技术之Spark——Spark SQL

2023-10-31

一、SparkSQL 概述

1.1  SparkSQL是什么

        Spark SQL是Spark用于结构化数据处理的Spark模块。

1.2 Hive and SparkSQL

        我们之前学习过hive,hive是一个基于hadoop的SQL引擎工具,目的是为了简化mapreduce的开发。由于mapreduce开发效率不高,且学习较为困难,为了提高mapreduce的开发效率,出现了hive,用SQL的方式来简化mapreduce:hive提供了一个框架,将SQL转换成mapreduce来执行。执行的效率不会因此提升,但开发效率会大大提高。

        同样的,sparkCore的代码能不能转换成SQL语句呢?sparkSQL的前身是Shark,是一个将spark和hive结合的框架,利用hive SQL简化的思想,将RDD进行简化。Shark的出现,是SQL-on-Hadoop的性能比Hive有了10-100倍的提高。

        随着spark的发展,shark的发展受制于hive,在此基础上发展出sparkSQL和hive on spark,SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive。

        sparkSQL可以用于简化RDD的开发,提高开发效率,且执行效率飞铲快。所以实际工作中,基本上采用的都是sparkSQL。sparkSQL为了简化RDD的开发,提高开发效率,提供了2个编程抽象,类似于spark Core的RDD。

DataSet

DataFrame

        

1.3 SparkSQL特点

1.3.1 易整合

无缝的整合了SQL查询和Spark编程

1.3.2 统一的数据访问

使用相同的方式连接不同的数据源

1.3.3 兼容Hive

在已有的仓库上直接运行SQL或者HiveQL

1.3.4 标准数据连接

通过JDBC或者ODBC来连接

1.4 DataFrame是什么

        在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,RDD只关心数据,如传入一数组(1,2,3,4),RDD不关心数组的意思。而DataFrame关心数据结构,如主键。

 RDD

        虽然以Person为参数类型,但Spark框架本身不了解Person类的内部结构。

DataFrame

        提供了详细的结构信息,使得SparkSQL可以清楚的知道该数据集中包含哪些列,每列的名称和类型各是什么。

        DataFrame为数据提供了Schema视图,可以把它当作数据库中的一张表来对待。

1.5 DataSet是什么

        DataSet是分布式数据集合,是DataFrame的一个扩展。提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点

        DataFrame是一个特定泛型的DataSet。

二、SparkSQL核心编程

2.1 新的起点

        SparkCore中,如果想要执行应用程序,首先需要构建上下文环境对象SparkContext。SparkSQL可以理解为是对SparkCore的封装。不仅是在模型上进行了封装,上下文环境对象也进行了封装。

        在老的版本中,SparkSQL提供了两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

        SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。Spark Session 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的SparkSession 对 象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样。

2.2 DataFrame

        在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。

2.2.1 创建DataFrame

Spark支持创建文件的数据源格式:spark.read.

csv        format        jdbc        json        load        option        options        orc        
parquet         schema         table        text        textFile

object DataFrameTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("detaSetDemo").getOrCreate()

    val dataFrame = spark.read.json("in/user.json")

    dataFrame.printSchema()
    dataFrame.show()

    spark.stop()
  }

}
注意:
        如果从内存中获取数据,spark 可以知道数据类型具体是什么。
        如果是数字,默认作 为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。

结果展示: 

 2.2.2 SQL语法

1)读取JSON文件创建DataFrame

val dataFrame = spark.read.json("in/user.json")

2)对DataFrame创建一个临时表

dataFrame.createTempView("user")
// 或
dataFrame.createOrReplaceTempView("user")

3)通过SQL语句实现查询全表

val frame = spark.sql("select * from user")

4)结果展示

frame.show()

 

 注意:

普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

2.2.3 DSL语法

        DataFrame提供一个特定领域语言(domain-specific kanguage, DSL)去管理结构化的数据。可以在Sacla, Java, Python和 R 中使用DSL,使用DSL语法风格不必去创建临时试图了。

1)创建一个DataFrame

val df = spark.read.json("in/user.json")

2)查看DataFrame的Schema信息

df.printSchema()

3)只查看列数据的6种方式

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
//    输出的6种方式
    import spark.implicits._
    userDF.select('name,'age).show()
    userDF.select("name","age").show()
    userDF.select($"name",$"age").show()

    userDF.select(userDF("name"),userDF("age")).show()

    userDF.select(col("name"),col("age")).show()
    userDF.select(column("name"),column("age")).show()

    val idColumn = df("id")

4)查看“age”大于“22”的数据

条件过滤可以使用filter,也可以使用where,where的底层调用的也是filter方法。

df.select(userDF("name"),userDF("age"),(userDF("age")+1).as("ageinc"))
      .where($"name"=!="zhangsan").show()   // where底层也是filter
//        .filter($"ageinc">22).show()

5)按照“age”分区,查看数据条数

val countDF = df.groupBy("age").count()
countDF.printSchema()

6)增加列withColumn

val frame = countDF.withColumn("number",$"count".cast(StringType))

7)修改列名withColumnRenamed

 val frame2 = countDF.withColumnRenamed("count","number")

2.2.4 RDD转换为DataFrame

        在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入import spark.implicits._

        这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

rdd =>DataFrame: rdd.toDF

DataFrame => rdd: df.rdd

2.3 DataSet

        DataSet是具有强类型的数据集合,需要提供对于的类型信息。

2.3.1 创建DataSet

object DataSetDemo {

// 1)使用样例类来创建DataSet
  case class Point(label:String,x:Double,y:Double)
  case class Category(id:Long,name:String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("dataSet").getOrCreate()
    val sc = spark.sparkContext

//    重点记忆
    import spark.implicits._


// 2)使用基本类型的序列创建DataSet
    val points: Seq[Point] = Seq(Point("nj", 23.43, 57.12), Point("bj", 18.21, 199.43), Point("sh", 16.11, 18.3))
    val pointDS = points.toDS()

    val categories: Seq[Category] = Seq(Category(1, "nj"), Category(2, "bj"))
    val categoryDS = categories.toDS()
    categoryDS.printSchema()
    categoryDS.show()


}
注意
        在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。

2.3.2 RDD转换为DataSet

        SparkSQL能够自动将包含case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射编程了表的列名。case类可以包含如Seq或者Array等复杂的结构。

case class User(name: String, age: Int)

sc.makeRDD(Seq(("zhangsan",18), ("zhaosi",20))).toDS

2.3.3 DataSet转换为RDD

        DataSet也是对RDD的封装,所以可以直接获得内部的RDD。

case class User(name: String, age: Int)
val res1 = sc.makeRDD(Seq(("zhangsan",18), ("zhaosi",20))).toDS

val rdd = res1.rdd

2.3.4 DataFrame和DataSet转换

DataFrame => DataSet:as[样例类]

DataSet => DataFrame:toDF

case class User(name: String, age: Int)
val userDF = sc.makeRDD(Seq(("zhangsan",18), ("zhaosi",20))).toDF("name", "age")

val userDS = userDF.as[User]

2.4 RDD、DataFrame、DataSet 三者的关系

2.4.1 相互转化

// RDD <=> DataFrame
val rdd = spark.sparkContext.makeRDD(List(1,"zhangsan",30),(2,"lisi",40))
val df: DataFrame = rdd.toDF("id","name","age")
val rowRDD:RDD[Row] = df.rdd

// DataFrame <=> DataFrame
val ds:Dataset[User] = df.as[User]
val df1:DataFrame = ds.toDF()

// RDD <=> DataSet
rdd.map {
    case (id, name, age) =>{
        User(id, name, age)
    }
}
val userRDD:RAA[User] = ds1.rdd

 

 2.4.2 三者的共性

1. 都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2. 都有惰性机制,在创建、转换时,不会立即执行。只有在遇到行动算子时,才会开始运行

3. 有很多共同的函数

4. DataFrame 和 DataSet 许多操作都需要导入包:import spark.implicits._

5. 都会根据Spark的内存情况自动缓存运算,即使数据量很大,也不用担心内存溢出

6. 都有partition的概念

7.  DataFrame 和 DataSet 都可以使用匹配模式获取各个字段的值和类型

2.4.3 三者的区别

 1)RDD

  • RDD一般和spark mllib同时使用
  • RDD不支持sparkSQL操作

 2)DataFrame

  • RDD和DataFrame不同,DataFrame每一行的类型固定为Row,每一列的值无法直接访问,只有通过解析才能获取各个字段的值
  • DataFrame 和 DataSet 一般捕鱼spark mllib 同时使用
  • DataFrame 和 DataSet 都支持SparkSQL操作,如select,groupby等。同事也能注册临时表/视窗,进行sql语句操作。
  • DataFrame 和 DataSet支持一些方便的保存方式,比如保存成csv,可以带上表头

 3)DataSet

  • DataFrame 和 DataSet拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame其实就是DataSet的一个特例
  • DataFrame 也可以叫 DataSet[Row],每一行的类型是Row

三、SparkSQL连接Hive

object SparkHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("sparkHive")
      .master("local[*]")
      .config("hive.metastore.uris", "thrift://192.168.153.139:9083")
      .enableHiveSupport()
      .getOrCreate()
    spark.sql("show databases").show()

    val ecd = spark.table("shopping.ext_customer_details")
    ecd.printSchema()
    ecd.show()

    println("============")

//    每个国家分别有多少员工
    import spark.implicits._
    import org.apache.spark.sql.functions._
    val usernumdf = ecd
//    这里输出之后会自带表头。这是由于我们在hive中设置了删除表头'skip.header.line.count'='1'
//    所以需要过滤表头文件。
      .where($"customer_id"=!="customer_id")
//      .filter($"customer_id"=!="customer_id")
      .groupBy("country")
      .agg(count("customer_id").as("usernum"))
    usernumdf.printSchema()
    usernumdf.show(3)

    usernumdf.write.mode("append").saveAsTable("shopping.usernum")


    spark.close()
  }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据技术之Spark——Spark SQL 的相关文章

随机推荐

  • CATIA Composer软件安装包分享(附安装教程)

    目录 一 软件简介 二 软件下载 一 软件简介 CATIA Computer Aided Three dimensional Interactive Application 是一款由法国达索系统公司开发的三维计算机辅助设计 CAD 软件 它
  • latex 入门,包含图片、公式、添加目录、另起一页等用法

    文章目录 一 基础用法 1 2 3 二 图片插入 三 列表 四 公式 五 表格 六 自动生成目录 七 另起一页 一 基础用法 1 documentclass report 解决中文不显示 另外需要将compiler设置为XeLaTex us
  • 打造专业形象:公司怎么搭建一个网站

    在当今互联网时代 拥有一个专业的公司网站对于企业而言是非常重要的 一个优秀的网站不仅能够提升企业的形象 还能够吸引更多的客户并提高品牌知名度 那么 如何打造一个专业的公司网站呢 以下是一些步骤和案例来帮助您 第一步 确定网站目的和受众 在开
  • [PyTorch] 可视化tensor图像

    PyTorch提供了直接对tensor可视化的接口函数 plt imshow transforms ToPILImage image interpolation bicubic transforms ToPILImage image sho
  • Picgo+github上传图片报错

    Picgo github上传图片报错 查看picgo log日志文件 PicGo ERROR method PUT url https api github com repos statusCode 422 message Request
  • 海康、大华IpCamera RTSP地址和格式

    感谢作者 http blog csdn net byxdaz article details 51647267 实时流 海康 rtsp username password ip port codec channel subtype av s
  • 分享关于Linux驱动设备操作集file_operations

    前言 在上一篇文章中 我们学习了驱动的基本框架 这一章 我们会在上一章代码的基础上 继续对驱动的框架进行完善 要下载上一篇文章的全部代码 请点击这里 1 字符设备的四个基本操作 驱动让用户程序具备操作硬件设备的能力 那么对硬件设备有哪些操作
  • AR互动大屏项目实战

    1课程简介 mp4 2课程安排 mp4 3恐龙博物馆项目需求 mp4 4项目设计 mp4 5资源需求 模型流程及外包标准 mp4 6模型动画导入 动画流程及外包标准 mp4 7项目场景搭建 简易手段拍照并使用PS制作全景天空盒 mp4 8A
  • web前端技术笔记(十五)json、本地存储jqueryUI和移动端JS

    jquery json 配置服务器环境 ajax与jsonp 同步和异步 局部刷新和无刷新 同源策略 ajax使用方法 jsonp 360联想词案例 本地存储 jquery 设置cookie localStorage sessionStor
  • linux下创建c文件

    1 在终端中输入 vim c 进入vim编辑器 2 按 i 键进入输入模式 进行c语言代码的输入 输入结束后 按 ESC 进入命令模式 3 输入wq 回车退出vim编辑
  • 前端数据打点(埋点)

    一 埋点概念 埋点就是在应用中特定的流程收集一些信息 用来跟踪应用使用的状况 后续用来进一步优化产品或是提供运营的数据支撑 包括访问数 Visits 访客数 Visitor 停留时长 Time On Site 页面浏览数 Page View
  • 【问题解决】Gitee+jenkins提示”could not read Username for ‘https://gitee.com‘: No such device or address“

    一直显示这个 然后才发现是自己的仓库是私有的 将仓库设置为公开即可解决
  • C89标准库头文件——非本地跳转的应用

    setjmp函数用于设置跳转的目的位置 longjmp函数进行跳转 env 保留了需要返回的位置的堆栈情况 setjmp的返回值 直接调用该函数 则返回0 若由longjmp的调用 导致setjmp被调用 则返回status longjmp
  • SSRF漏洞(原理、挖掘点、漏洞利用、修复建议)

    一 介绍SSRF漏洞 SSRF Server Side Request Forgery 服务器端请求伪造 是一种由攻击者构造请求 由服务端发起请求的安全漏洞 一般情况下 SSRF攻击的目标是外网无法访问的内部系统 正因为请求是由服务端发起的
  • matlab中的strfind和findstr函数

    一 strfind函数 函数用法说明 strfind s1 s2 or strfind s1 pattern 说明 在s1中搜索pattern 例子 相关解答 function r myfun2 rand seed 2301 c 97 fi
  • PS如何将图片处理成特定像素(以标准的2寸照片为例)

    生活中我们在网站上注册信息需要上传个人照片时 常常遇到照片不符合网站要求等情况 今天我们以2寸照片 即626 高 413 宽 像素为例 来教大家如何用PS将照片处理成特定像素 1 裁剪照片 首先 拿到照片第一步 先裁剪照片至相应规格 打开P
  • mfc中添加按钮对应的处理函数四种方法

    方法一 双击按钮自动添加处理函数 自动以 OnBnClicked 开头 ID 结尾命名 这种方法 最简单 但无法修改函数名称 只能生成默认的按钮按下消息的函数 方法二 类向导 Ctrl p z 对按钮右键选择 类向导 在消息栏中选择默认的
  • 牛客sql练习二

    11获取所有员工当前的manager 题目描述 获取所有员工当前的manager 如果当前的manager是自己的话结果不显示 当前表示to date 9999 01 01 结果第一列给出当前员工的emp no 第二列给出其manager对
  • NepCTF 2022 MISC <签到题>(极限套娃)

    题目链接 CTFm 这道题融合了图片隐写 py脚本编写和usb流量分析 下载题目附件 是个gz压缩包 无加密直接打开 里面是一张图片 图片长这样 盲猜图片隐写 打开虚拟机使用binwalk工具康康 确实有多文件包含 binwalk e xx
  • 大数据技术之Spark——Spark SQL

    一 SparkSQL 概述 1 1 SparkSQL是什么 Spark SQL是Spark用于结构化数据处理的Spark模块 1 2 Hive and SparkSQL 我们之前学习过hive hive是一个基于hadoop的SQL引擎工具