【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户

2023-05-16

🚀 作者 :“大数据小禅”

🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪

手机流量日志处理

        • SparkSQL简介
        • 依赖引入
        • SparkSQL快速入门案例
        • 手机流量日志数据格式与处理要求
        • 处理程序

SparkSQL简介

  • Spark SQL是Apache Spark的一个模块,提供了一种基于结构化数据的编程接口。它允许用户使用SQL语句或DataFrame API来查询和操作数据,同时还支持使用Spark的分布式计算引擎进行高效的并行计算。

  • Spark SQL支持多种数据源,包括Hive、JSON、Parquet、Avro、ORC等,这些数据源可以通过DataFrame API或SQL语句进行查询和操作。同时,Spark SQL还提供了一些高级功能,如窗口函数、聚合函数、UDF等,以满足更复杂的数据分析需求。

  • Spark SQL还支持将SQL查询结果写入到外部数据源,如Hive表、JSON文件、Parquet文件等。此外,Spark SQL还提供了一些工具,如Spark SQL CLI、JDBC/ODBC驱动程序等,方便用户进行交互式查询和数据分析。

  • 使用前需要新引入对应依赖

依赖引入

使用Spark SQL需要在项目中添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>

其中,spark-sql_2.12是Spark SQL的核心依赖,spark-core_2.12是Spark的核心依赖。注意,版本号可以根据实际情况进行调整。

如果需要使用其他数据源,如MySQL、Hive等,则需要添加相应的依赖。例如,如果需要连接MySQL数据库,则需要添加以下依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.25</version>
</dependency>

其中,spark-sql-kafka-0-10_2.12是连接Kafka数据源的依赖,mysql-connector-java是连接MySQL数据库的依赖。注意,版本号也可以根据实际情况进行调整。

以上是使用Maven进行依赖配置的方式。

SparkSQL快速入门案例

  • 准备数据
  • 我们假设有一个CSV文件employee.csv,包含了员工的信息,如下所示:
id,name,age,gender,salary
1,Jack,25,M,5000
2,Lucy,28,F,6000
3,Tom,30,M,8000
4,Lily,27,F,7000
5,David,32,M,9000

创建SparkSession对象
首先,我们需要创建一个SparkSession对象,它是Spark SQL的入口点。可以使用以下代码创建SparkSession对象:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("Spark SQL Demo")
  .getOrCreate()
//加载CSV文件
//使用SparkSession对象的read方法加载CSV文件:

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employee.csv")
//其中,header=true表示第一行是列名,inferSchema=true表示自动推断列的数据类型。

//创建临时表
//使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时表:

df.createOrReplaceTempView("employee")
//执行SQL查询
//使用SparkSession对象的sql方法执行SQL查询:
val result = spark.sql("SELECT * FROM employee WHERE age > 27")
这将返回所有年龄大于27岁的员工信息。

//输出结果
//使用DataFrame的show方法输出查询结果:

result.show()
//这将输出所有符合条件的员工信息。
  • 完整代码如下:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("Spark SQL Demo")
  .getOrCreate()

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employee.csv")

df.createOrReplaceTempView("employee")

val result = spark.sql("SELECT * FROM employee WHERE age > 27")

result.show()
输出结果:

+---+----+---+------+-----+
| id|name|age|gender|salary|
+---+----+---+------+-----+
|  2|Lucy| 28|     F| 6000|
|  3| Tom| 30|     M| 8000|
|  5|David| 32|     M| 9000|
+---+----+---+------+-----+

手机流量日志数据格式与处理要求

  • 日志字段与字段说明如下
    在这里插入图片描述1.需要实现的需求1.按月统计流量使用量最多的用户(每个月使用流量最多的用户)
    2.将结果数据持久化到硬盘

处理程序

/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */
// 时间戳         手机号码          基站物理地址             ip        接受数 接受数据包 上行流量  下行流量  状态码
//2020-03-10	15707126156	QK-X7-7N-G2-1N-QZ:CMCC	212.188.187.220	33	     40	    67584	   81920	200
//使用量 =上+下  手机号码就是用户   RDD处理方式->((月,号码),(上行+下行))
//1.下载手机流量日志
//2.按月统计流量使用量最多的用户
//3.将结果数据持久化到硬盘
object LogPhone {
  System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.3\\hadoop-2.7.3")
  def main(args: Array[String]): Unit = {
    //1.创建sparksession
    val sc = new sql.SparkSession.Builder()
      .appName("test")
      .master("local[6]")
      .config("spark.testing.memory", "471859201")
      .getOrCreate()
    // 读取输入文件
    val log = sc.sparkContext.textFile("dataset\\phone.log")
    val value = log.map(_.split("\t")).filter(arr => {
      !(arr(1) == null)
    }).map(tmp => {
      //处理日期 获取月份
      val month: String = tmp(0).split("-")(1)
      //号码
      val user = tmp(1)
      //使用流量数
      var use = tmp(6) + tmp(7)
      Log(user, use.toLong, month)
    })
    sc.createDataFrame(value).createOrReplaceTempView("log")
    //每个月流量使用做多的用户 group by行数会减少,开窗函数over()行数不会减少
    val data: DataFrame = sc.sql("select user,month,useall from " +
      "(select user,month,sum(use) over(partition by user,month order by use desc) as useall," +
      "dense_rank() over(partition by month order by use desc) as rn from log)t1 where rn=1 order by month")
    data.show()
    data.write.parquet("dataset\\output\\directory")

    sc.close()
  }
}

/**
  * @Description
  * @Author xiaochan
  * @Version 1.0
  */
case class Log(
    user: String,
    use: Long,
    month: String)


  • 结果如下
    在这里插入图片描述

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Spark手机流量日志处理】使用SparkSQL按月统计流量使用量最多的用户 的相关文章

  • 数据倾斜

    数据倾斜发生时的现象 1 绝大多数task执行得都非常快 但个别task执行的极慢 2 原本能正常执行的Spark作业 某天突然爆出OOM 内存溢出 异常 观察异常栈 是我们写的业务代码造成的 数据倾斜发生的原理 在进行shuffle的时候
  • windows下安装spark及hadoop

    windows下安装spark 1 安装jdk 2 安装scala 3 下载spark spark下载地址 3 1安装spark 将下载的文件解压到一个目录 注意目录不能有空格 比如说不能解压到C Program Files 作者解压到了这
  • spark集群搭建与mysql元数据管理

    找个spark集群搭建是针对于上一篇hadoop的基础上搭建的 所以spark的版本也是要按照着hadoop版本进行下载 1 解压spark 修改spark的 etc profile的home目录 2 安装SCALA 并配置SCALA HO
  • Spark性能调优之Shuffle调优

    Spark性能调优之Shuffle调优 Spark底层shuffle的传输方式是使用netty传输 netty在进行网络传输的过程会申请堆外内存 netty是零拷贝 所以使用了堆外内存 shuffle过程中常出现的问题 常见问题一 redu
  • Spark基础知识(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 一 Spark概述 Spark模块 Core SQL Streami
  • SparkSession和sparkSQL

    一 概述 spark 有三大引擎 spark core sparkSQL sparkStreaming spark core 的关键抽象是 SparkContext RDD SparkSQL 的关键抽象是 SparkSession Data
  • Spark DataFrame的Join操作和withColumn、withColumnRenamed方法实践案例(Scala Demo代码)

    import org apache log4j Level Logger import org apache spark sql SparkSession import org apache spark sql functions obje
  • 深入理解 SQL 中的 Grouping Sets 语句

    前言 SQL 中 Group By 语句大家都很熟悉 根据指定的规则对数据进行分组 常常和聚合函数一起使用 比如 考虑有表 dealer 表中数据如下 id Int city String car model String quantity
  • 浅谈Hadoop体系和MPP体系

    浅谈Hadoop体系和MPP体系 引言 如题 在大数据发展至今 为了应对日益繁多的数据分析处理 和解决客户各种奇思妙 怪 想需求 形形色色的大数据处理的框架和对应的数据存储手段层出不穷 有老当益壮的Hadoop体系 依靠Hadoop巨大的社
  • spark报Got an error when resolving hostNames. Falling back to /default-rack for all

    一 报错代码如下 21 06 01 20 13 36 INFO yarn SparkRackResolver Got an error when resolving hostNames Falling back to default rac
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • spark-submit 报错 Initial job has not accepted any resources

    spark submit 报这样的错误 WARN scheduler TaskSchedulerImpl Initial job has not accepted any resources check your cluster UI to
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • 大数据手册(Spark)--Spark基本概念

    文章目录 Spark 基本概念 Hadoop 生态 Spark 生态 Spark 基本架构 Spark运行基本流程 弹性分布式数据集 RDD Spark安装配置 Spark基本概念 Spark基础知识 PySpark版 Spark机器学习
  • Spark 任务调度机制

    1 Spark任务提交流程 Spark YARN Cluster模式下的任务提交流程 如下图所示 图YARN Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程 图Spark任务提交时序图 提交
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • Spark的常用概念总结

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 基本概念 1 RDD的生成 2 RDD的存储 3 Dependency 4 Transformation和Action 4 1 Transformatio
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • spark SQL基础教程

    1 sparkSQL入门 sparksql专门用于处理结构化的数据 而RDD还可以处理非结构化的数据 sparksql的优点之一是sparkfsql使用统一的api读取不同的数据 第二个优点是可以在语言中使用其他语言 例如python 另外
  • python+django基于Spark的国漫画推荐系统 可视化大屏分析

    国漫推荐信息是现如今社会信息交流中一个重要的组成部分 本文将从国漫推荐管理的需求和现状进行分析 使得本系统的设计实现具有可使用的价 做出一个实用性好的国漫推荐系统 使其能满足用户的需求 并可以让用户更方便快捷地国漫推荐 国漫推荐系统的设计开

随机推荐