1. Spark-SQL 基本操作
1.1 需求
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json
{ "id":1 , "name":"Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age·":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为employee.json创建DataFrame,并写出Scala语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id字段;
(4) 筛选出age>30的记录;
(5) 将数据按age分组;
(6) 将数据按name升序排列;
(7) 取出前3行数据;
(8) 查询所有记录的name列,并为其取别名为username;
(9) 查询年龄age的平均值;
(10) 查询年龄age的最小值。
1.2 实现
** 我保存的文件路径:/test/employee.json
1) 为employee.json创建DataFrame
在scala命令行中输入如下语句:
import org.apache.spark.sql.SparkSession
import spark.implicits._
// 创建一个spark sql对象
val spark = SparkSession.builder().getOrCreate()
// 读取本地json文件,并得到df
val df = spark.read.json("file:///test/employee.json")
// 展示df的所有内容
df.show()
2) 把表employeesDF注册为临时表
df.createOrReplaceTempView("e")
(1)查询所有数据
spark.sql("select * from e").show
(2) 查询所有数据,并去除重复的数据;
spark.sql("select distinct * from e").show
(3) 查询所有数据,打印时去除id字段;
spark.sql("select age, name from e").show
(4) 筛选出age>30的记录;
spark.sql("select * from e where age > 30").show
(5) 将数据按age分组;
spark.sql("select count(age) from e group by age").show
(6) 将数据按name升序排列;
spark.sql("select * from e order by name asc").show
(7) 取出前3行数据;
spark.sql("select * from e limit 3").show
(8) 查询所有记录的name列,并为其取别名为username;
spark.sql("select name as username from e").show
(9) 查询年龄age的平均值;
spark.sql("select avg(age) from e").show
(10) 查询年龄age的最小值。
spark.sql("select min(age) from e").show
1.3 运行结果
2. 编程实现将RDD转换为DataFrame
2.1 需求
请将数据students_data.txt,实现从RDD转换得到DataFrame,并将表中的内容输出。请写出程序代码。
2.2 实现
在scala命令行中输入如下语句:
// 导包
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._
// 定义一个Student类
case class Student(id:Int, name:String, gender:String, age:Int, course_id:Int, score:Double, classes:String)
// 读取文件
val df1 = spark.sparkContext.textFile("file:///test/students_data.txt")
// 以逗号为分隔符分割文字
val df2 = df1.map(_.split(","))
// 将df2转换为DataFrame格式的df3
val df3 = df2.map(attributes => Student(attributes(0).trim.toInt, attributes(1), attributes(2), attributes(3).trim.toInt, attributes(4).trim.toInt, attributes(5).trim.toDouble, attributes(6))).toDF()
// 必须注册为临时表才能供下面的查询使用
df3.createOrReplaceTempView("s")
// 使用spark-sql 语句展示表格内容
spark.sql("select * from s").show
2.3 运行结果