🚀 作者 :“大数据小禅”
🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪
手机流量日志处理
- SparkSQL简介
- 依赖引入
- SparkSQL快速入门案例
- 手机流量日志数据格式与处理要求
- 处理程序
SparkSQL简介
-
Spark SQL是Apache Spark的一个模块,提供了一种基于结构化数据的编程接口。它允许用户使用SQL语句或DataFrame API来查询和操作数据,同时还支持使用Spark的分布式计算引擎进行高效的并行计算。
-
Spark SQL支持多种数据源,包括Hive、JSON、Parquet、Avro、ORC等,这些数据源可以通过DataFrame API或SQL语句进行查询和操作。同时,Spark SQL还提供了一些高级功能,如窗口函数、聚合函数、UDF等,以满足更复杂的数据分析需求。
-
Spark SQL还支持将SQL查询结果写入到外部数据源,如Hive表、JSON文件、Parquet文件等。此外,Spark SQL还提供了一些工具,如Spark SQL CLI、JDBC/ODBC驱动程序等,方便用户进行交互式查询和数据分析。
-
使用前需要新引入对应依赖
依赖引入
使用Spark SQL需要在项目中添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。
如果需要使用其他数据源,如MySQL、Hive等,则需要添加相应的依赖。例如,如果需要连接MySQL数据库,则需要添加以下依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
其中,spark-sql-kafka-0-10_2.12是连接Kafka数据源的依赖,mysql-connector-java是连接MySQL数据库的依赖。注意,版本号也可以根据实际情况进行调整。
以上是使用Maven进行依赖配置的方式。
SparkSQL快速入门案例
- 准备数据
- 我们假设有一个CSV文件employee.csv,包含了员工的信息,如下所示:
id,name,age,gender,salary
1,Jack,25,M,5000
2,Lucy,28,F,6000
3,Tom,30,M,8000
4,Lily,27,F,7000
5,David,32,M,9000
创建SparkSession对象
首先,我们需要创建一个SparkSession对象,它是Spark SQL的入口点。可以使用以下代码创建SparkSession对象:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("Spark SQL Demo")
.getOrCreate()
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("employee.csv")
df.createOrReplaceTempView("employee")
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
这将返回所有年龄大于27岁的员工信息。
result.show()
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("Spark SQL Demo")
.getOrCreate()
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("employee.csv")
df.createOrReplaceTempView("employee")
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
result.show()
输出结果:
+---+----+---+------+-----+
| id|name|age|gender|salary|
+---+----+---+------+-----+
| 2|Lucy| 28| F| 6000|
| 3| Tom| 30| M| 8000|
| 5|David| 32| M| 9000|
+---+----+---+------+-----+
手机流量日志数据格式与处理要求
- 日志字段与字段说明如下
1.需要实现的需求1.按月统计流量使用量最多的用户(每个月使用流量最多的用户)
2.将结果数据持久化到硬盘
处理程序
object LogPhone {
System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.3\\hadoop-2.7.3")
def main(args: Array[String]): Unit = {
val sc = new sql.SparkSession.Builder()
.appName("test")
.master("local[6]")
.config("spark.testing.memory", "471859201")
.getOrCreate()
val log = sc.sparkContext.textFile("dataset\\phone.log")
val value = log.map(_.split("\t")).filter(arr => {
!(arr(1) == null)
}).map(tmp => {
val month: String = tmp(0).split("-")(1)
val user = tmp(1)
var use = tmp(6) + tmp(7)
Log(user, use.toLong, month)
})
sc.createDataFrame(value).createOrReplaceTempView("log")
val data: DataFrame = sc.sql("select user,month,useall from " +
"(select user,month,sum(use) over(partition by user,month order by use desc) as useall," +
"dense_rank() over(partition by month order by use desc) as rn from log)t1 where rn=1 order by month")
data.show()
data.write.parquet("dataset\\output\\directory")
sc.close()
}
}
case class Log(
user: String,
use: Long,
month: String)
- 结果如下
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)