Spark 如何向工作线程发送闭包?

2024-01-15

当我编写 RDD 转换时,例如

val rdd = sc.parallelise(1 to 1000) 
rdd.map(x => x * 3)

据我了解,关闭(x => x * 3)这只是一个 Function1 需要可序列化,并且我想我在某处读过编辑:它就在文档中暗示的地方:http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark它被“发送”给工人执行。 (例如 Akka 向工作人员发送一段“可执行代码”来运行)

是这样的吗?

我参加的一次聚会上有人评论说,它实际上并没有发送任何序列化代码,但由于每个工作人员无论如何都会获得 jar 的“副本”,因此它只需要引用要运行的函数或类似的东西(但我不确定我是否正确引用了那个人的话)

我现在对它的实际工作原理感到非常困惑。

所以我的问题是

  1. 如何向工人发送转型关闭信息?通过akka连载?或者它们“已经在那里”,因为 Spark 将整个 uber jar 发送给每个工作人员(对我来说听起来不太可能......)

  2. 如果是这样,那么罐子的其余部分如何发送给工人呢?这是“cleanupClosure”在做什么吗?例如仅向工作人员发送相关字节码而不是整个 uberjar? (例如,只有闭包的依赖代码?)

  3. 总而言之,spark 是否在任何时候都会同步--jars与工人的类路径以某种方式?或者它是否向工作人员发送“适量”的代码?如果它确实发送了闭包,它们是否会被缓存以供重新计算的需要?或者每次安排任务时都会发送带有任务的闭包?抱歉,如果这是愚蠢的问题,但我真的不知道。

如果可以的话,请添加来源作为您的答案,我在文档中找不到明确的内容,而且我太谨慎了,无法仅通过阅读代码来得出结论。


闭包肯定是在运行时序列化的。我在 pyspark 和 scala 中看到过很多在运行时出现 Closure Not Serialized 异常的实例。有一个复杂的代码称为

From ClosureCleaner.scala

def clean(
    closure: AnyRef,
    checkSerializable: Boolean = true,
    cleanTransitively: Boolean = true): Unit = {
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

试图缩小正在序列化的代码。然后,代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。

这是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:

  private def ensureSerializable(func: AnyRef) {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 如何向工作线程发送闭包? 的相关文章

  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • 在 IntelliJ 中运行 Spark 字数统计

    我花了几个小时浏览 You Tube 视频和教程 试图了解如何在 Scala 中运行 Spark 字数统计程序 并将其转换为 jar 文件 我现在完全糊涂了 我运行了 Hello World 并且了解了如何在 Apache spark sp
  • 行类型 Spark 数据集的编码器

    我想写一个编码器Row https spark apache org docs 2 0 0 api java index html org apache spark sql Row html输入 DataSet 用于我正在执行的地图操作 本
  • 使用 Apache Spark 读取 JSON - `corrupt_record`

    我有一个json file nodes看起来像这样 toid osgb4000000031043205 point 508180 748 195333 973 index 1 toid osgb4000000031043206 point
  • 如何在SparkR中进行map和reduce

    如何使用 SparkR 进行映射和归约操作 我能找到的只是有关 SQL 查询的内容 有没有办法使用 SQL 进行映射和减少 See 写入从 SparkR map 返回的 R 数据帧 https stackoverflow com quest
  • Apache Spark:Yarn 日志分析

    我有一个 Spark streaming 应用程序 我想使用 Elasticsearch Kibana 分析作业的日志 我的工作在纱线集群上运行 因此日志将按照我的设置写入 HDFSyarn log aggregation enable为真
  • PySpark 使用统计信息写入 Parquet 二进制列(signed-min-max.enabled)

    我找到了这张 apache parquet 票https issues apache org jira browse PARQUET 686 https issues apache org jira browse PARQUET 686被标
  • S3并行读写性能?

    考虑 Spark 或任何其他 Hadoop 框架 从 S3 读取大型 例如 1 TB 文件的场景 多个spark执行器如何从S3并行读取非常大的文件 在 HDFS 中 这个非常大的文件将分布在多个节点上 每个节点都有一个数据块 在对象存储中
  • 从数据块中的数组列获取数据,无需交叉连接

    假设我有一张桌子 id array col 101 system x value 1 system y value 2 system z value 3 其中 array col 基本上包含一个结构数组 0 系统 x 值 1 1 系统 y
  • 当我在 scala 中使用全局映射变量而不广播时会发生什么

    在 scala 中 当我在 scala 中使用全局映射变量而不进行广播时会发生什么 例如 如果我使用变量collect 例如collectAsMap 看来它是一个全局变量 我可以在所有地方使用它RDD mapValues 函数无需显式广播它
  • 与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃

    Let config json是一个小的 json 文件 toto 1 我编写了一个简单的代码来读取 json 文件sc textFile 因为文件可以在S3 本地或HDFS上 所以textFile很方便 import org apache
  • 检查 pyspark df 列的值是否存在于其他 pyspark df 列中

    我有 2 个 pyspark 数据帧 我想检查一列的值是否存在于另一个数据帧的列中 我只看到了如何过滤存在的值的解决方案 像这样 https stackoverflow com questions 41775281 filtering a
  • Spark UDF 错误 - 不支持 Any 类型的架构

    我正在尝试创建一个 udf 它将列中的负值替换为 0 我的数据框名为 df 包含一列名为 avg x 这是我创建 udf 的代码 val noNegative udf avg acc x Double gt if avg acc x lt
  • 仅使用 Spark ML Pipelines 进行转换

    我正在开发一个项目 其中可配置的管道和 Spark DataFrame 更改的沿袭跟踪都是必不可少的 该管道的端点通常只是修改后的 DataFrame 将其视为 ETL 任务 对我来说最有意义的是利用现有的 Spark ML Pipelin
  • SQL 类似于 PySpark 数据帧的 NOT IN 子句

    例如 在 SQL 中 我们可以这样做select from table where col1 not in A B 我想知道是否有一个与此等效的 PySpark 我能够找到isin类似于 SQL 的函数IN条款 但没有任何内容NOT IN
  • Spark中DataFrame、Dataset、RDD的区别

    我只是想知道有什么区别RDD and DataFrame Spark 2 0 0 DataFrame 只是一个类型别名Dataset Row 在阿帕奇火花 你能将其中一种转换为另一种吗 首先是DataFrame是从SchemaRDD 是的
  • 为什么spark在sql查询末尾附加'WHERE 1=0'

    我正在尝试使用 Apache Spark 执行简单的 mysql 查询并创建一个数据框 但由于某些原因 Spark 附加 WHERE 1 0 在我想要执行的查询末尾并抛出异常说明 You have an error in your SQL

随机推荐

  • 是否可以对网页中的 iframe 进行截图?

    我正在尝试截取网页中 iframe 的屏幕截图 在我的特定情况下 iframe 包含我的一位客户商店的街景视图 据我搜索和阅读 我没有找到任何解决方案 我知道有像这样的 JavaScript 库Html2Canvas and Canvas2
  • 如何将数据库从一台 MongoDB 服务器复制到另一台服务器?

    我在不同的服务器上有两个 mongodb 都以 auth 现在我想将数据库从一台服务器复制到另一台服务器 gt mongo gt use admin gt db copyDatabase mydb mydb another server 表
  • Python中Matlab的datenum(datestring)的等价函数

    在 Matlab 中 当我运行 datenum http de mathworks com help matlab ref datenum html 功能如下 datenum 1970 1 1 我得到以下输出 719529 我试图找到等效的
  • 如何升级 Windows 10 中的 Python 安装?

    我的一个 LAB 工作站上安装了 Python 2 7 11 我想将 Python 至少升级到 3 5 我该怎么做呢 我是否应该完全卸载 2 7 11 而不是安装新版本 有办法更新吗 更新是个好主意吗 Python 的每个次要版本 即任何
  • 如何重置 COMP_WORDBREAKS 而不影响其他完成脚本?

    当我实现 bash 自动完成功能时 有些事情让我感到困惑 我将把它放入 etc bash completion d 为了实现某些功能 我想删除分词字符冒号 来自变量 COMP WORDBREAKS并添加斜杠 开始于 COMP WORDBRE
  • 如何使用 iGraph 在 R 中挖掘主题

    我正在尝试使用该包在 R 中挖掘 3 节点图案igraph 我想检索图中每个单独顶点的图案数量 这在 graph motifs 函数中似乎不可能 因此 对于示例图 testGraph barabasi game 10 m 5 power 2
  • 将 TFS 项目转换为 git 存储库的最佳方法是什么

    我知道 VS2012 中对 git 的支持已经有了很多进展 我们目前有一个 Team Foundation Server 2012 更新 2 其中我们的所有项目都以经典 TFS 格式存储 我们希望从 TFS 迁移到纯 git repo 系统
  • 添加内存地址错误

    这无法在 VSC 2008 中编译 void toSendMemory2 toSendMemory 4 我不知道为什么 尽管我确信我这样做很愚蠢 P 当你添加N to a T 指针将增加sizeof T N bytes sizeof voi
  • Visual Studio 2013 C++ lambda 捕获参数包

    目前 Visual Studio 2013 update 2 不支持完整的 C 11 其中一项功能是捕获 lambda 中的参数包 有没有一种简单的方法可以解决这个问题 或者我是否必须放弃 Visual Studio 并使用兼容的编译器 例
  • IE7:如何让TD浮动?

    我想要一套 td s 在 IE7 中向左浮动 如果窗口太小 它们应该中断到下一行 CSS table width 100 td border 1px solid red tr f td width 500px float left HTML
  • 托管 Angular 2 应用程序

    我是新来的Angular 2 我认识楼主Angular 1 x在共享主机上 例如GoDaddy 但我不知道如何发布Angular 2应用程序 例如我有这个结构文件夹 angular2 quickstart app app component
  • 如何使用所有模型的通用 Trait 在 Laravel 中实现 eloquent 事件

    我在用拉拉维尔 5 4创建一个网络应用程序 我创建了一个特征来实现创建 更新 删除和恢复雄辩事件的事件 我创建了一个trait如下
  • 如何使用 AngularJS 更改一个 div 上的类,同时将鼠标悬停在另一个 div 上?

    我想使用 AngularJS 指令更改一个 div 的类 同时将鼠标悬停在另一个 div 上 这是我到目前为止所拥有的http jsfiddle net E8nM5 38 http jsfiddle net E8nM5 38 HMTL di
  • 当indexedDB被阻止时应用程序应该如何反应

    我在另一个地方被告知question https stackoverflow com questions 39997018关于检测阻止和解除阻止事件 阻止的打开 或删除 不会被取消 只是 被阻止 一旦解除阻止 打开 或删除 将继续 我想知道
  • Wcf 基本身份验证

    通过简单的测试 Wcf 服务使用基本身份验证时遇到一些问题 我遇到了一个例外 无法激活请求的服务 http qld tgower test Service svc 有关详细信息 请参阅 gt 服务器的诊断跟踪日志 在跟踪日志中它显示 在主机
  • WPF DataGrid实际ColumnHeaderHeight

    当我将 WPF DataGrid 的 ColumnHeaderHeight 设置为 Auto double NaN 时 如何获取列标题的实际呈现高度 我似乎无法在 DataGrid 类中找到该属性 您可以通过在视觉树中搜索来获取它DataG
  • 按照教程 AWS Elastic Beanstalk 的 Flask 教程时出现错误“Your requests.txt is invalid”

    我正在关注 AWS Elastic Beanstalk 的烧瓶教程 http docs aws amazon com elasticbeanstalk latest dg create deploy python flask html部署示
  • 用于确定测试成绩通过/失败的 MIPS 程序

    我正在编写一个 MiPS 程序 该程序将检查 15 个测试分数的列表 它将从终端输入 通过标准是 50 分 终端的输出将包括每个类别的分数以及通过和失败的学生人数 我应该使用输入提示和输出语句 请我需要一些帮助 只需要一些建议如何去做 ma
  • 禁用 GridView 列调整大小

    有什么方法可以在 WPF 中禁用 GridViewColumn 调整大小吗 我不想设置控件的样式 请参阅此链接 ListView 中的固定宽度列 无法调整大小的列 http blogs msdn com b atc avalon team
  • Spark 如何向工作线程发送闭包?

    当我编写 RDD 转换时 例如 val rdd sc parallelise 1 to 1000 rdd map x gt x 3 据我了解 关闭 x gt x 3 这只是一个 Function1 需要可序列化 并且我想我在某处读过编辑 它