Spark在进行计算的时候通常会包含以下几个步骤
- 创建SparkContext上下文对象
- 使用SparkContext加载数据创建RDD
- RDD的转换算子transfotmations
- RDD的行动算子actions
- RDD的缓存和持久化
1.创建SparkContext上下文对象
// SparkContext
// 创建SparkConf对象用于配置参数
val conf = new SparkConf
// 连接集群
// conf.setMaster("spark://host02:7077")
// 本地测试时可以使用local作为master
// 设置本地模式运行spark程序 允许使用2个cpu核心
conf.setMaster("local[2]")
// 设置应用程序的名称
conf.setAppName("WordCount")
//获取SparkContext对象
val sc = new SparkContext(conf)
2.使用SparkContext加载数据创建RDD
// 创建RDD
// 1. 使用scala集合创建RDD 通常用于测试
// 1.1 makeRDD
//函数声明
// def makeRDD[T: ClassTag](
// seq: Seq[T],
// numSlices: Int = defaultParallelism): RDD[T] =
val RDD: RDD[Int] = sc.makeRDD(1 to 10, 4)
val numPar = RDD.getNumPartitions
// println(numPar)
// println(rdd.count())
//1.2 parallelize
// parallelize是早期的SparkApi
// 由于单词太长,所以重新封装为makeRDD
// def parallelize[T: ClassTag](
// seq: Seq[T],
// numSlices: Int = defaultParallelism): RDD[T] =
// 2. 读取文件创建RDD
// 生产环境会将原始数据保存在HDFS上
// 使用SparkContext从HDFS读取数据创建RDD
// 2.1 读取文本文件
// 将文本文件按行读取 每行作为RDD的一个元素
val filePath = "xxxxxxx"
val RDD = sc.textFile(filePath)
// 2.2 读取SequenceFile
// Hadoop中有时会使用SequenceFile进行KV数据存储
// Spark为了兼容Hadoop提供了SequenceFile的解析方式
val RDD3: RDD[(String, String)] = sc.sequenceFile[String, String]("C:\\Users\\Amos\\Desktop\\output")
// 2.3 读取ObjectFile
sc.objectFile("C:\\Users\\Amos\\Desktop\\objectFile")
3.RDD的转换算子transfotmations
//这一步的目的就是将RDD使用转换算子处理,形成一个新的RDD传递给下一步操作
//常见的转化算子
// 1. map 映射
// 1.1 声明
// def map[U: ClassTag](f: T => U): RDD[U] =
// 1.2 参数
// f 一元函数
// f的参数是源RDD的元素类型
// f的返回值 是任意类型
// 1.3 返回值
// 一个新的RDD 泛型是f的返回值类型
// 1.4 作用
// 将源RDD中的元素依次传入f中
// 将f的返回值收集到新的RDD并返回
// 1.5 Eample
RDD.map(x=>{
val strings = x.split(" ")
strings.head
})
// 2. filter 过滤
// 2.1. 声明
// def filter(f: T => Boolean): RDD[T] =
// 2.2. 参数 源RDD的元素类型
// 2.3. 返回值 Boolean
// 2.4. 作用
// 保留满足条件的元素
// 2.5 Example
RDD.filter(x=>{
x.split(" ")(8) == "200"
})
// 3. flatMap 扁平化处理
// 3.1. 声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)