5、累加器
通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。 Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。
驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。 注意:工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。 对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。转化操作中累加器可能会发生不止一次更新。
object SparkCoreDemo13_Accumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Demo13")
val sc = new SparkContext(conf)
var count = 0
// 1. 创建累加器对象
val acc = new LongAccumulator
val myAcc = new MyAccumulator
// 2. 使用sc注册累加器
sc.register(acc)
sc.register(myAcc)
val rdd = sc.makeRDD(1 to 10)
// val sum = rdd.reduce(_ + _)
// val count1 = rdd.count()
// sum / count1.toDouble
val rdd1 = rdd.map(x => {
count += 1
// 3. 使用累加器对象进行数据的添加
acc.add(1)
myAcc.add(x)
println(s"acc value in map: ${acc.value} --" + Thread.currentThread().getId)
println(s"count in map: $count---" + Thread.currentThread().getId)
x + 1
})
println(rdd1.collect().toList)
// 4. 使用value()获取累加器的值
println(s"acc value in main: ${acc.value} --" + Thread.currentThread().getId)
println(s"count in main: $count--" + Thread.currentThread().getId)
// 上面的代码中
// 如果想通过创建在Driver中的局部变量统计RDD 算子的执行次数
// 最终无法获取到执行次数
// 因为RDD的算子操作是在Driver中进行编译
// 并真正提交到执行器(Executor)中的任务线程(Task)中执行
// 每个线程(Task)都会保有一份属于自己线程的局部变量
// 最终Driver程序中的局部变量没有参与任何运算
// Spark提供了Accumulator 累加器对象 用于方便的进行分布式聚合(计数)
// AccumulatorV2
// add(对象) 将对象添加到累加器中
// 对象 = value() 获取累加器中的值
println(myAcc.value)
}
}
自定义累加器
// [IN,OUT] 累加器的输入对象
// 累加器的输出对象
class MyAccumulator extends AccumulatorV2[Int, Double] {
// 创建成员属性用于记录当前累加器的值
var count: Long = 0L
var sum: Long = 0L
/**
* 用于判断当前累加器是否为初始状态
*
* @return
*/
override def isZero: Boolean = this.count == 0 && this.sum == 0
/**
* 复制当前累加器的状态
*
* @return
*/
override def copy(): AccumulatorV2[Int, Double] = {
val accumulator = new MyAccumulator
accumulator.count = this.count
accumulator.sum = this.sum
accumulator
}
/**
* 重置当前累加器的值
*/
override def reset(): Unit = {
this.count = 0
this.sum = 0
}
/**
* 将传入的对象添加到当前的累加器值中
*
* @param v
*/
override def add(v: Int): Unit = {
this.count += 1
this.sum += v
}
/**
* 将其他分区的累加器传入merge 并将所有累加器的值进行合并
*
* @param other 其他分区的累加器
*/
override def merge(other: AccumulatorV2[Int, Double]): Unit = {
val o = other.asInstanceOf[MyAccumulator]
this.count += o.count
this.sum += o.sum
}
/**
* 返回当前累加器的值
*
* @return
*/
override def value: Double = this.sum.toDouble / this.count
}
6、广播变量
使用广播变量的过程如下:
(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。
(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
package com.zch.spark.core
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source
/**
* Created with IntelliJ IDEA.
* Author: Amos
* E-mail: amos@amoscloud.com
* Date: 2021/12/14
* Time: 9:22
* Description:
*/
object SparkCoreDemo14_BroadcastVariable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Demo14")
val sc = new SparkContext(conf)
// 加载黑名单文件放入集合
val source = Source.fromFile("C:\\Users\\Amos\\Desktop\\blackList.txt")
// 文件大小1GB
val blkList: List[String] = source.getLines().toList
source.close()
// 1. 创建广播对象
val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList)
// 加载日志数据创建RDD
val rdd = sc.textFile("C:\\Users\\Amos\\Desktop\\weblog\\access.log-20211107")
// 将日志数据通过处理得到 (ip,是否为黑名单用户)
rdd
.repartition(10)
.map(line => {
val ip = line.split(" ").head
//2. 需要使用时 从公共缓存中读取对象
val list = bc_blkList.value
(ip, if (list.contains(ip)) 1 else 0)
})
.foreach(println)
}
}