Spark sql的简单使用

2023-10-26

加载依赖

依赖这个东西,只要注意几个依赖之间的相互关系能够匹配的上就行了,这里需要在idea里面写sql,只需要加上一个spark_sql的依赖就行了
这里的2.11是Scala的版本,如果本地也有Scala,要注意互相之间依赖的关联,如果不太清楚,直接去maven的官网去搜就可了maven官网
在这里插入图片描述

spark sql简单入门

sparksql实际上就是把写出来的sql语法加载成RDD,交到集群中去运行;spark sql可以很好的集成hive,在里面也可以写hivesql,spark core中提供的数据结构是RDD,spark sql提供的数据结构是DataFrame,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格;DataFrame可以从很多数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表。

在这里插入图片描述

Spark sql简单应用

应用配置

这里的SparkSession是Spark2.0入口,SparkContext是1.0入口

/**
      * SparkSession 是Spark2.0的新入口
      */
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo1SparkSession")
      //设置spark sql产生shuffle后默认的分区数 => 并行度
      // 默认是200,也就是说,这里如果不修改,默认有200个reduce
      .config("spark.sql.shuffle.partitions", 3)
      .getOrCreate()

读取文件

读取文本文件并展示数据

这里读取的不管是csv文件还是txt文件,默认都是csv读取,不过这里要手动指定列头

展示数据这里用到的是show(),不过默认只显示20条,需要显示多要自己加参数

//不管是txt还是csv格式,这里默认都是以csv读取
    spark
      .read
      //这里要手动指定列名,不然列头是_c0,_c1,_c2这样的格式
        .schema("id Int,name String,age Int,gender String,clazz String")
      .csv("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
      .show()

show()将数据完全显示

假如我们这里的数据非常长
在这里插入图片描述
不能被显示完全,就需要在show()中加一个参数
show(false)即可
在这里插入图片描述

读取json文件

//读取json数据
    spark
      .read
//        .format("json")
//      .load("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\一份json数据.json")
      .json("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\一份json数据.json")
      .show()

读取jdbc文件

假设我们这里不知道该如何使用,可以从官网中一步步读取
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这里面就包含了各个参数的作用,以及示例代码
在这里插入图片描述

//jdbc读取MySQL数据
    spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://master:3306/test1")
      .option("dbtable", "emp")
      .option("user", "xxxx")
      .option("password", "xxxxxx")
      .load()
      .show()

在这里插入图片描述

读取压缩格式的文件

//读取压缩格式的文件
    spark
      .read
      .format("orc")
      .load("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\Spark\\data\\parquet")
      .show()

将数据以压缩格式存储

parquet或者orc格式存储
spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://master:3306/test1")
      .option("dbtable", "emp")
      .option("user", "xxxx")
      .option("password", "xxxxxx")
      .load()
      .write
      .parquet("spark/data/parquet")

spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://master:3306/test1")
      .option("dbtable", "emp")
      .option("user", "xxxx")
      .option("password", "xxxxxx")
      .load()
      .write
      .mode(SaveMode.Overwrite)
      .orc("spark/data/parquet")

读取数据注册成视图并写SQL

我们可以看到,sparksql可以读取csv、json、jdbc的数据源,并且可以在自身的DatFrame中使用类SQL语言和HiveSQL
在这里插入图片描述

直接写sql

val stuDF: DataFrame = spark
      .read
      //这里要手动指定列名,不然列头是_c0,_c1,_c2这样的格式
      .schema("id Int,name String,age Int,gender String,clazz String")
      .csv("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")

    //直接将DataFrame注册成临时视图view
    stuDF.createOrReplaceTempView("stu")
    spark.sql("select * from stu where age>23").show()

在这里插入图片描述

类sql的模式

//类sql的方式,介于sql和代码中间的API
    stuDF.where("age > 23")
      .select("name","id","clazz")
      .show()

在这里插入图片描述

写出文件到别的路径

//统计班级人数
    //需要写出文件到别的地方,就需要write
    stuDF.groupBy("clazz")
      .count()
      .write
      //保存的时候可以指定保存的方式
      //Overwrite 覆盖
      //Append 追加
      .mode(SaveMode.Overwrite)
      .save("spark/data/clazz_cnt")

RDD和DF的相互转换

RDD转换成DF

如果是RDD读取文本文件的数据,比如是学生表的数据,有id,name,age这样的字段,建议将学生信息写成一个样例类,再存入RDD,再转换成DF

//这里通过RDD的方式构建DF,推荐使用样例类来做
    val sc: SparkContext = spark.sparkContext
    val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")

    val stuRDDMap: RDD[Student] = stuRDD.map(
      stu => {
        val strings: Array[String] = stu.split(",")
        val id: String = strings(0)
        val name: String = strings(1)
        val age: String = strings(2)
        val gender: String = strings(3)
        val clazz: String = strings(4)
        Student(id, name, age, gender, clazz)
      }
    )

    //导入隐式转换
    import spark.implicits._
    val sDF: DataFrame = stuRDDMap.toDF()
    sDF.show()

    val ssDF: DataFrame = stuRDD.toDF()
    ssDF.show(false)

  }

  case class Student(id: String, name: String, age: String, gender: String, clazz: String)

不进行这样操作的话,读取到的数据会是这样的:
在这里插入图片描述
转换成样例类进行读取的话是这样的:这样更方便对数据进行操作
在这里插入图片描述

DF转换成RDD

val rdd: RDD[Row] = sDF.rdd
    rdd.foreach(
      rdd=>{
        val id: String = rdd.getAs[String]("id")
        val name: String = rdd.getAs[String](1)
        println(s"$id,$name")
      }
    )

在这里插入图片描述

rdd.map(rdd=>{
      val id: String = rdd.getAs[String]("id")
      val name: String = rdd.getAs[String]("name")
      (id,name)
    }).foreach(println)

在这里插入图片描述

DF中函数的使用

导入数据表的时候记得指定字段

val spark: SparkSession = SparkSession
      .builder()
      .config("spark.sql.shuffle.partitions", 2)
      .master("local")
      .appName("Demo3DFAPI")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._
    // 导入所有的sql函数
    import org.apache.spark.sql.functions._

    val stuDF: DataFrame = spark
      .read
      .schema("id Int,name String,age Int,gender String,clazz String")
      .csv("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")

where

推荐使用列表达式的形式

//字符串表达式
    stuDF.where("age > 21").show()
    //列表达式
    stuDF.where($"age" > 21).show()

filter

//filter函数
    stuDF.filter(stu=>{
      val id: Int = stu.getAs[Int]("id")
      if (id < 1500100007){
        true
      }
      else {
        false
      }
    })
      .show()

select

stuDF.select( $"id" ,$"name",$"age" + 100 as "new_age").show()

聚合函数的位置(agg)

这里面在分组之后,要将聚合函数放进agg中才可以使用

//统计每个班不同性别的人数(直接统计人数)
 stuDF.groupBy($"clazz",$"gender")
      .agg(count($"gender")).show()

    stuDF.groupBy($"clazz",$"gender")
      .agg(countDistinct($"gender")).show()

join

//join
    //默认时inner join 可以指定关联的方式
    //关联字段一样时
    stuDF.join(scoDF,"id").show()
    //关联字段不一样时 
    stuDF.join(scoDF,$"id"===$"sid","left").show()

直接写sql

//直接写sql
    stuDF.createTempView("students")
    spark.sql(
      """
        |select
        |   clazz
        |   ,count(distinct id)
        |from students
        |group by clazz
      """.stripMargin).show()

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark sql的简单使用 的相关文章

  • Django 模型 - 外键作为主键

    我有以下2张表 在 models py 中 class Foo models Model uuid models CharField UUID primary key True default uuid4 and class FooExt
  • 如何对主索引重新编号

    我有一个简单的 MySQL 表 主索引 id 不是一一编号的 1 31 35 100 等 我希望它们的编号如 1 2 3 4 请告诉我该怎么做 我还想指出的是 我知道该操作可能产生的后果 但我只是想整理一下表格 我同意其他方法也可以 但我只
  • Java/Hibernate - 异常:内部连接池已达到其最大大小,当前没有可用的连接

    我第一次在大学项目中使用 Hibernate 而且我还是个新手 我想我遵循了我的教授和我阅读的一些教程给出的所有指示 但我不断收到标题中的异常 Exception in thread main org hibernate Hibernate
  • 优化mysql中日期类型字段的查询

    我目前准备了以下查询 select sum amount as total from incomes where YEAR date 2019 and MONTH date 07 and incomes deleted at is null
  • 如何使组合键唯一?

    I am making a database of students in one school Here is what I have so far 如果您不喜欢阅读 请跳至 简而言之 部分 问题是我对这个设计并不满意 我想要的组合gra
  • 仅当所有记录都匹配时 SQL 连接

    我有3张桌子 CP carthead idOrder CP cartrows idOrder idCartRow CP shipping idCartRow idShipping dateShipped 每个 idOrder 可以有多个 i
  • 获取列名称以及 JSON 响应

    我有三个实体类 我编写了包含两个表的联接的查询 表 费用类别 Entity Table name ExpensesCategories public class ExpensesCategories Id GeneratedValue st
  • Crystal Reports 相当于“WHERE”

    我熟悉 SQL 但不熟悉 Crystal Reports 我正在尝试处理包含 5 列的导入数据集 id deathDate giftDate giftAmount Dead 123 2008 01 06 2011 09 08 25 00 T
  • 什么时候应该使用 XML 而不是 SQL? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • SQL/C# - UPSERT 上的主键错误

    UPDATE 简化的问题 从问题中删除了 C 在以下情况下 如何编写一个可以识别两行相同的 UPSERT 看看怎么有一个 b 退格键 在那里编码 奇怪的小字符 SQL 将它们视为相同 虽然我的 UPSERT 将此视为new data并在应该
  • PostgreSQL:有效地将 JSON 数组拆分为行

    我有一个表 表 A 其中包含一个包含 JSON 编码数据的文本列 JSON 数据始终是一个包含一到几千个普通对象的数组 我有另一个表 表 B 其中有几列 包括数据类型为 JSON 的列 我想从表 A 中选择所有行 将 json 数组拆分为其
  • T-SQL 中是否有 LIKE 语句的替代方案?

    我有一个场景我需要执行以下操作 SELECT FROM dbo MyTable WHERE Url LIKE
  • SQL如何将两个日期之间一小时内的事件相加并显示在一行中

    我正在使用 C 和 SQL Server 2005 开发一份报告 我只需显示我们每小时获得的点击次数 桌子很大 输出应如下所示 Row Date Time Hit Count 1 07 05 2012 8 00 3 2 07 05 2012
  • MYSQL:SQL查询获取自增字段的值

    我有一张桌子 主键是id及其自动递增 现在 当我插入新记录时 我需要获取更新记录的 id 我怎样才能做到这一点 如果我使用查询 select max id from table name 执行后我可以获得id 但我能确定它是刚刚插入的记录的
  • 存储过程总是返回0

    我试图从存储过程获取返回值 但它总是返回 0 c code cmd new SqlCommand cmd CommandType CommandType StoredProcedure cmd CommandText AbsentEntry
  • ORACLE:未找到数据——但数据存在

    调试包过程 当实际上有数据时却找不到数据 仅测试 SELECT SELECT trim trailing from GL SECURITY as DUMMY FROM b2k user b2k WHERE sms username FUCH
  • 在 Access 数据库中对列包含数字和字母的数据进行排序

    请帮助我 因为我一直无法做到这一点 选择此列 columnA 的访问 SQL 是什么 以便它返回一个结果集 其中的不同值首先根据数字排序 然后根据字母排序 这是列值 10A 9C 12D 11G 9B 10C 9R 8T 我尝试过 从 tb
  • 什么会导致 Oracle ROWID 更改?

    AFAIK Oracle 中的 ROWID 表示相应数据文件中记录的物理位置 在什么情况下记录的ROWID可能会改变 我所知道的一个是分区表上的更新 它将记录 移动 到另一个分区 还有其他情况吗 我们的大多数数据库都是 Oracle 10
  • 如何使用sql脚本更改列的属性

    如何使用 sql 脚本更改列的属性 这是我尝试过但出现错误的方法 ALTER TABLE dbo tblBiometricPattern COLUMN BiometricPatternID TINYINT NOT NULL IDENTITY
  • 每行中非空列的计数

    我有一个包含 4 列的表 在第 5 列中我想存储前 4 列中有多少个非空列的计数 例如 其中 X 是任意值 Column1 Column2 Column3 Column4 Count X X NULL X 3 NULL NULL X X 2

随机推荐