Spark简介
什么是Spark?
Apache Spark是一种多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。
Spark的安装
检查
检查HDFS、YARN环境
[vagary@vagary ~]$ jps
4736 NameNode
5490 NodeManager
5106 SecondaryNameNode
4870 DataNode
5881 Jps
5375 ResourceManager
检查Java环境
[vagary@vagary ~]$ java -version
java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
Scala部署
下载Scala安装包
这些安装包在官网上都有
https://www.scala-lang.org/download/all.html,然后这里我们安装2.12.15版本的Scala
[vagary@vagary software]$ wget https://downloads.lightbend.com/scala/2.12.15/scala-2.12.15.tgz
解压Scala安装包
[vagary@vagary software]$ tar -zxvf scala-2.12.15.tgz -C ../app
创建软连接
[vagary@vagary app]$ ln -s scala-2.12.15 scala
配置环境变量
编辑全局变量文件,/etc/profile
[root@vagary ~]# vi /etc/profile
然后将环境变量加入
export SCALA_HOME=/home/vagary/app/scala
export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$PATH
配置完成退出后,使环境变量生效:
[root@vagary ~]# source /etc/profile
然后验证一下,出现版本号就表示配置成功:
[root@vagary ~]# scala -version
Scala code runner version 2.12.15 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.
Spark部署
首先下载Spark安装包,从官网https://spark.apache.org/downloads.html上看版本很多,这里我们选择3.2.1的版本进行下载
注:我们Hadoop版本是3.1.3,然后这里下的spark是3.2.1,因为是自己学的,可以这么去用,如果服务器级别的开发是不能这么下载的,还是要下对应版本,或者预编译的。
预编译版的Spark
预编译好的,没有我们要的版本,所以我们要下载,从官网上下
https://spark.apache.org/downloads.html,版本很多,这里我们就用3.2.1,然后选择包类型为Source code,然后进行下载
解压之后进入dev目录下,修改make-distribution.sh文件
[vagary@vagary dev]$ vi make-distribution.sh
限定好版本
VERSION=3.2.1
SCALA_VERSION=2.12.15
SPARK_HADOOP_VERSION=3.1.3
SPARK_HIVE=1
修改Scala版本
[vagary@vagary dev]$ ./change-scala-version.sh 2.12
然后进行编译就可以了,这个过程会有点漫长~
[vagary@vagary spark-3.2.1]$ ./dev/make-distribution.sh --name 3.2.1-hadoop3.1.3 --tgz -Phive -Phive-thriftserver -Pyarn -Dhadoop.version=3.1.3 -Dscala.version=2.12.15
解压
将安装包解压在app目录下
[vagary@vagary software]$ tar -zxvf spark-3.2.1-bin-hadoop3.2.tgz -C ../app
创建软连接
[vagary@vagary app]$ ln -s spark-3.2.1-bin-hadoop3.2 spark
配置环境变量
编辑全局变量文件,/etc/profile
[root@vagary ~]# vi /etc/profile
然后将环境变量加入
export SPARK_HOME=/home/vagary/app/spark
export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$SPARK_HOME/bin:$PATH
配置完成退出后,使环境变量生效:
[root@vagary ~]# source /etc/profile
然后验证一下,出现版本号就表示配置成功:
[root@vagary ~]# scala -version
Scala code runner version 2.12.15 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.
开始添加Spark配置信息
进入spark/conf文件夹,会看到所有文件都是template,一般情况都是拷贝一个,然后进行修改(避免修改错误,尽量保存原本文件,随时恢复)
[vagary@vagary conf]$ cp spark-env.sh.template spark-env.sh
然后打开spark-env.sh文件添加配置
SPARK_CONF_DIR=/home/vagary/app/spark/conf
HADOOP_CONF_DIR=/home/vagary/app/hadoop/etc/hadoop
YARN_CONF_DIR=/home/vagary/app/hadoop/etc/hadoop
添加MySQL驱动
这里直接拿之前hive的MySQL驱动包就行了
[vagary@vagary lib]$ cp mysql-connector-java-5.1.27-bin.jar ../../spark/jars/
添加hive配置文件
将之前的hive的conf/hive-site.xml文件,拷贝到spark/conf下就行了
[vagary@vagary spark]$ cp ../hive/conf/hive-site.xml ./conf/
测试是否安装部署完成
运行一个测试圆周率的实例,如果出现结果,就说明部署完成
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /home/sqq/app/spark/examples/jars/spark-examples*.jar 10
其实在跑一个实例任务的时候,会一闪而过一个web端的,但是任务结束就消失了,所以我们需要取启动History Server ,保留跑完的服务;
配置History Server
复制一份spark-defaults.conf出来,进行修改
[vagary@vagary conf]$ cp spark-defaults.conf.template spark-defaults.conf
我们先去hdfs上创建一个目录,存放spark的日志文件
[vagary@vagary conf]$ hdfs dfs -mkdir hdfs://vagary:9000/spark-log
[vagary@vagary conf]$ hdfs dfs -ls hdfs://vagary:9000/
Found 5 items
drwxr-xr-x - vagary supergroup 0 2022-03-20 17:45 hdfs://vagary:9000/delete
drwxr-xr-x - vagary supergroup 0 2022-04-10 02:46 hdfs://vagary:9000/spark-log
将这两个配置信息的注释符去掉,然后将目录修改为我们刚刚创建的那个目录
spark.eventLog.enabled true
spark.eventLog.dir hdfs://vagary:9000/spark-log
然后修改spark-env.sh文件,加上History Server的配置
export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://vagary:9000/spark-log -Dspark.history.ui.port=7777"
查看当前状态下的启动的服务有哪些
[vagary@vagary conf]$ jps
1377 SecondaryNameNode
6433 Jps
1121 DataNode
988 NameNode
1822 NodeManager
1662 ResourceManager
到sbin目录下,启动History Server
[vagary@vagary sbin]$ ./start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /home/vagary/app/spark/logs/spark-vagary-org.apache.spark.deploy.history.HistoryServer-1-vagary.out
jps查看一下,可以看到History Server是启动好的
[vagary@vagary sbin]$ jps
1377 SecondaryNameNode
1121 DataNode
6723 HistoryServer
988 NameNode
6797 Jps
1822 NodeManager
1662 ResourceManager
然后再去web端查看一下,可以看到是启动成功了
这里再跑一下实例,查看web端状态,可以看到刚跑的实例在这里是记录日志得了。
wordcount实例
先将文件上传到hdfs上
[vagary@vagary ~]$ hdfs dfs -put /home/vagary/wcdata.txt hdfs://vagary:9000/data/
2022-04-10 03:21:34,402 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
然后到bin目录下,启动spark-shell.sh脚本
[vagary@vagary bin]$ ./spark-shell
读一个文件将结果打印出来
scala> sc.textFile("/data/wcdata.txt").foreach(println)
aaa;bbb;ccc (0 + 2) / 2]
aaa;bbb;ddd
aaa;java;hadoop
hadoop;java;spark;scala
scala;spark;java
以分号进行切割,然后打印出来
scala> sc.textFile("/data/wcdata.txt").flatMap(_.split(";")).foreach(println)
aaa
bbb
ccc
aaa
bbb
ddd
aaa
java
hadoop
hadoop
java
spark
scala
scala
spark
java
等同于这样子写法
scala> sc.textFile("/data/wcdata.txt").flatMap(x => x.split(";")).foreach(println)
aaa
bbb
ccc
aaa
bbb
ddd
aaa
java
hadoop
hadoop
java
spark
scala
scala
spark
java
拆成k-v键值对形式
scala> sc.textFile("/data/wcdata.txt").flatMap(x => x.split(";")).map((_,1)).foreach(println)
(scala,1)
(spark,1)
(java,1)
(aaa,1)
(bbb,1)
(ccc,1)
(aaa,1)
(bbb,1)
(ddd,1)
(aaa,1)
(java,1)
(hadoop,1)
(hadoop,1)
(java,1)
(spark,1)
(scala,1)
最后进行值汇总
scala> sc.textFile("/data/wcdata.txt").flatMap(x => x.split(";")).map((_,1)).reduceByKey(_+_).foreach(println)
(scala,2)
(bbb,2)
(ddd,1)
(java,3)
(spark,2)
(hadoop,2)
(ccc,1)
(aaa,3)
注释:其中flatMap类似于 map,但每个输入项可以映射到 0 个或更多输出项(因此func应该返回一个 Seq 而不是单个项);
foreach,对数据集的每个元素运行函数func。这通常是针对副作用进行的,例如更新累加器或与外部存储系统交互。
yarn-client/yarn-cluster
最大区别:Driver运行地方不同;
client运行在本地客户端,Spark-submit启动Driver线程;
Cluster模式下,启动,会将任务丢给yarn,向yarn申请一台机器运行;
Driver/Executor
Driver:
①、driver进程就是应用的main()函数并且构建sparkContext对象,当我们提交了应用之后,便会启动一个对应的driver进程, driver本身 会根据我们设置的参数占有一定的资源(主要指cpu core和memory),
②、driver可以运行在master上,也可以运行worker上(根据部署模式的不同)。
③、driver首先会向集群管理者(standalone、 yarn, mesos) 申请spark应用所需的资源,也就是executor, 然后集群管理者会根据spark应用所设置的参数在各个worker上分配一定数量 的executor,每个executor都占用-定数星的cpu和memory。在申请到应用所需的资源以后,driver就开始调度和执行我们编写的应用代码了。
④、driver进程 会将我们编写的spark应用代码拆分成多个stage,每个stage执行一部分代码片段,并为每个stage创建一批tasks, 然后将这些tasks分配到各个executor中执行。
Executor:
executor进程宿主在worker节点上,一个worker可以有多个executor。每个executor持有一个线程池,每个线程可以执行一个task, executor执行完task以后将结果返回给driver,每个executor执行的task都属于同一个应用。此外executor还有 一个功能就是为应用程序中要求缓存的RDD提供内存式存储,RDD是直接缓存在executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。