Spark(七)——累加器和广播变量

2023-11-16

5、累加器

通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。 Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是 add)增加累加器的值。 
 驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。 注意:工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。 对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。转化操作中累加器可能会发生不止一次更新。

object SparkCoreDemo13_Accumulator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Demo13")
    val sc = new SparkContext(conf)
​
    var count = 0
    // 1. 创建累加器对象
    val acc = new LongAccumulator
    val myAcc = new MyAccumulator
    // 2. 使用sc注册累加器
    sc.register(acc)
    sc.register(myAcc)
​
    val rdd = sc.makeRDD(1 to 10)
​
    //    val sum = rdd.reduce(_ + _)
    //    val count1 = rdd.count()
    //    sum / count1.toDouble
​
    val rdd1 = rdd.map(x => {
      count += 1
      // 3. 使用累加器对象进行数据的添加
      acc.add(1)
      myAcc.add(x)
      println(s"acc value in map: ${acc.value} --" + Thread.currentThread().getId)
      println(s"count in map: $count---" + Thread.currentThread().getId)
      x + 1
    })
​
    println(rdd1.collect().toList)
​
    // 4. 使用value()获取累加器的值
    println(s"acc value in main: ${acc.value} --" + Thread.currentThread().getId)
    println(s"count in main: $count--" + Thread.currentThread().getId)
    // 上面的代码中
    // 如果想通过创建在Driver中的局部变量统计RDD 算子的执行次数
    // 最终无法获取到执行次数
    //   因为RDD的算子操作是在Driver中进行编译
    //    并真正提交到执行器(Executor)中的任务线程(Task)中执行
    //     每个线程(Task)都会保有一份属于自己线程的局部变量
    //     最终Driver程序中的局部变量没有参与任何运算
​
    // Spark提供了Accumulator 累加器对象  用于方便的进行分布式聚合(计数)
​
    // AccumulatorV2
    //    add(对象)  将对象添加到累加器中
    //   对象 =  value()  获取累加器中的值
​
    println(myAcc.value)
​
  }
}

自定义累加器

//                               [IN,OUT] 累加器的输入对象
//                                        累加器的输出对象
class MyAccumulator extends AccumulatorV2[Int, Double] {
  // 创建成员属性用于记录当前累加器的值
  var count: Long = 0L
  var sum: Long = 0L
​
  /**
   * 用于判断当前累加器是否为初始状态
   *
   * @return
   */
  override def isZero: Boolean = this.count == 0 && this.sum == 0
​
  /**
   * 复制当前累加器的状态
   *
   * @return
   */
  override def copy(): AccumulatorV2[Int, Double] = {
    val accumulator = new MyAccumulator
    accumulator.count = this.count
    accumulator.sum = this.sum
    accumulator
  }
​
  /**
   * 重置当前累加器的值
   */
  override def reset(): Unit = {
    this.count = 0
    this.sum = 0
  }
​
  /**
   * 将传入的对象添加到当前的累加器值中
   *
   * @param v
   */
  override def add(v: Int): Unit = {
    this.count += 1
    this.sum += v
  }
​
  /**
   * 将其他分区的累加器传入merge 并将所有累加器的值进行合并
   *
   * @param other 其他分区的累加器
   */
  override def merge(other: AccumulatorV2[Int, Double]): Unit = {
    val o = other.asInstanceOf[MyAccumulator]
    this.count += o.count
    this.sum += o.sum
  }
​
  /**
   * 返回当前累加器的值
   *
   * @return
   */
  override def value: Double = this.sum.toDouble / this.count
}

6、广播变量

使用广播变量的过程如下:

(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。

(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。

(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

package com.zch.spark.core
​
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
​
import scala.io.Source
​
/**
 * Created with IntelliJ IDEA.
 * Author: Amos
 * E-mail: amos@amoscloud.com
 * Date: 2021/12/14
 * Time: 9:22
 * Description: 
 */
object SparkCoreDemo14_BroadcastVariable {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Demo14")
    val sc = new SparkContext(conf)
​
​
    // 加载黑名单文件放入集合
    val source = Source.fromFile("C:\\Users\\Amos\\Desktop\\blackList.txt")
    //   文件大小1GB
    val blkList: List[String] = source.getLines().toList
    source.close()
​
    // 1. 创建广播对象
    val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList)
​
    // 加载日志数据创建RDD
    val rdd = sc.textFile("C:\\Users\\Amos\\Desktop\\weblog\\access.log-20211107")
    // 将日志数据通过处理得到  (ip,是否为黑名单用户)
    rdd
      .repartition(10)
      .map(line => {
        val ip = line.split(" ").head
        //2. 需要使用时  从公共缓存中读取对象
        val list = bc_blkList.value
        (ip, if (list.contains(ip)) 1 else 0)
      })
      .foreach(println)
​
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark(七)——累加器和广播变量 的相关文章

  • OceanBase 4.1解读:我们想给用户一个开箱即用的OceanBase部署运维工具

    欢迎访问 OceanBase 官网获取更多信息 https www oceanbase com 关于作者 肖磊 OceanBase 产品专家 负责 OceanBase 运维管控体系产品规划与设计 包括安装部署工具 OBD OAT 运维管控平
  • 小程序领取微信卡券

    小程序领取微信卡券 获取 access token api ticket singuare等完整版 转载自 感谢这位作者 https www cnblogs com w53064 p 9771232 html 返回主页 啊傑Plus 博客园
  • Java实现,手写二叉树

    一 基本概念 二叉树 每个节点最多有两个子树的结构 满二叉树 除了最后一层没有任何节点外 每一层的所有节点都有两个子节点的二叉树 完全二叉树 结构与满二叉树类似 不同点在于最后一层可以不满 但最后一层的节点必须连续集中再最左边 二叉搜索树
  • Redis高级的相关问题总结

    1 rdb和aof有什么区别 你们在工作中如何使用redis的持久化策略 1 rdb定时对整个内存做快照 aof记录每一次执行的命令 2 rdb两次备份之间会丢失数据 aof取决于刷盘策略 相对比较完整 3 rdb宕机恢复速度快 aof恢复

随机推荐

  • XSL-FO Blocks(块)

    XSL FO 的输出位于块区域中 XSL FO 页面 流以及块 内容 块 会 流 入 页面 中 然后输出到媒介 XSL FO 输出通常被嵌套在
  • 监控神器-普罗米修斯Prometheus的安装

    最近看了些AIOPS的资料 对于里面提及的一个普罗米修斯Prometheus起了兴趣 首先是联想到异形 哈哈 去看了一下 普罗米修斯还真是厉害 而且还是开源的 真是搬砖党的福音 功能 在业务层用作埋点系统 Prometheus支持多种语言
  • 面向对象和面向过程的区别,Java为什么说它性能低

    面向对象相比于面向过程 面向对象的代码延展性更好 拿上面的例子举例如果现在要把存储的对象不是大象了而是狮子 那么很明显 面向过程要的方法要所有大象为狮子 面向对象的方法只要改一下关于这个改动的方法也就是 存储 内容 不仅仅是修改 添加删除等
  • C++:模拟实现string类

    文章目录 Iterator类 capacity类 Element access 类 Modifiers类 String operations类 类外成员函数 头文件总括 本篇主要介绍模拟实现string类 string中有相当多的内容 这里
  • AIX6.1 源码编译方式安装 zabbix3.4 代理

    1 aix机器上创建zabbix用户 mkuser zabbix 2 上传zabbix 3 4 11 targz 3 解压 gunzip zabbix 3 4 11 tar gz tar xvf zabbix 3 4 11 tar 4 安装
  • C语言实现DFT计算

    文章目录 一 DFT计算公式 二 DFT程序实现 一 DFT计算公式 这里就不对DFT概念进行叙述 直接上计算公式 其中N为DFT点数 公式如此 但是在程序中并非如此运算 而是利用欧拉公式对DFT的计算公式进行了转化 转换后公式变为 利用转
  • 为什么在控制台输入var name = Symbol();会报错?VM3436:1 Uncaught TypeError: Cannot convert a Symbol value to a str

    问题复现 var name Symbol VM3436 1 Uncaught TypeError Cannot convert a Symbol value to a string 解释每一个部分 var 定义的变量会把它提升到当前函数作用
  • 踩坑vue中嵌套iframe项目,嵌套在iframe中的项目无法登录!

    解决方案原文 这个我试了下是谷歌浏览器做了限制 在edge上可以正常登录 我遇到这种情况主要是我的项目用的是cookie存储的登录状态 需要设置cookie的域名 使其在嵌入的网站和网站域名下都能访问cookie 但是我设置了还是没用 我最
  • go cli脚手架开发利器——cobra库的初体验

    文章目录 关于 说明 cobra 简介 cobra 概念 Commands Flags Args 教程正文 demo1 快速了解 demo 知识点 Command创建命令 demo2 使用参数验证器 钩子函数 demo 知识点 参数验证方法
  • 最新xmind2022版思维导图如何使用详解教程

    前段时间和大家盘点了五种高效的学习方法 没有看过的小伙伴戳这里 最科学的学习方法盘点 讲到思维导图笔记法的时候 本狗子给大家推荐了一款免费的思维导图软件 xmind软件 然后好多小伙伴都加我问关于思维导图的使用方法 于是今天我就做了一份关于
  • 微信小程序编译bug---Hbuilderx编译时一直卡在编译界面

    项目场景 微信小程序新增某功能模块 问题描述 使用Hbuilderx开发微信小程序 正常开发中 在某次重新保存代码并编译后一直卡在编译界面 如图 图为Hbuilderx界面 因为此时没有编译完成 所以微信开发者工具界面一片空白 报错为 ap
  • Python,OpenCV使用KNN来构建手写数字及字母识别OCR

    Python OpenCV使用KNN来构建手写数字及字母识别OCR 1 原理 1 1 手写数字识别 1 2 字母识别 2 源码 2 1 手写数字OCR 2 2 字母OCR 参考 这篇博客将介绍如何借助OpenCV提供的手写数字及字母数据集
  • Linux用户空间与内核空间

    Linux用户空间与内核空间 2012 08 30 15 39 1969人阅读 评论 1 收藏 举报 linux linux内核 struct user system allocation Linux 操作系统和驱动程序运行在内核空间 应用
  • 2.Xaml 停靠框架

    1 运行效果图片 2 Xaml程序
  • ChatGPT实现知识图谱生成

    知识图谱生成 在之前章节中 我们尝试过让 ChatGPT 对一段文本做实体识别和词性分析 结果很不错 但如果是需要长期留存下来 后续在不同场景下快速查询分析 最好还是要把数据存入到专门的图数据库中 才能方便随时读取 本节 我们试试让 Cha
  • 《计算机网络原理》(谢希仁)笔记——第二章

    此为本人观看韩立刚老师视频所做笔记与总结 下面为视频连接 https www bilibili com video av10921041 from search seid 733222547867341420 第二章 物理层 主要知识点为数
  • Verilog对数据进行四舍五入(round)与饱和(saturation)截位

    重点 1 正数截位 直接看截掉的最高位是不是一 是的话进一 负数截位 截的最高位为1且其它位不全是0进一 2 饱和 也就是大于求的结果 整数变为符号位为0 其它位为1 负数变成第一位为1 其它位为0 一 引言 在利用Verilog写数字信号
  • Python入门到放弃(一)

    介绍python和库文件管理 python是解释型语言 Python的特点 简单 易学 速度快 免费开源 高层语言 可移植性 解释性 面向对象 也支持面向过程 可扩展性 可嵌入性 丰富的库 规范的代码 在cmd中退出python 输入exi
  • Tomcat优化相关问题

    1 你怎样给 tomcat 去调优 JVM 参数调优 Xms 表示 JVM 初始化堆的大小 Xmx表示 JVM 堆的最大值 这两个值的大小一般根据需要进行设置 当应用程序需要的内存超出堆的最大值时虚拟机就会提示内存溢出 并且导致应用服务崩溃
  • Spark(七)——累加器和广播变量

    5 累加器 通过在驱动器中调用SparkContext accumulator initialValue 方法 创建出存有初始值的累加器 返回值为org apache spark Accumulator T 对象 其中 T 是初始值 ini