为什么非常大的 Spark 阶段不使用所有可用的执行器?

2024-03-04

我正在运行一个包含一些非常大的阶段(例如 >20k 任务)的 Spark 作业,并使用 1k 到 2k 执行器运行它。

在某些情况下,阶段似乎运行不稳定:随着时间的推移,许多可用的执行器变得空闲,尽管仍然处于有许多未完成任务的阶段中间。从用户的角度来看,任务似乎正在完成,但已完成给定任务的执行者不会获得分配给他们的新任务。结果,该阶段花费的时间比应有的时间长,并且大量执行器 CPU 时间浪费在空闲上。这似乎主要(仅?)发生在从 HDFS 读取数据的输入阶段。

不稳定期间的 Spark stderr 日志示例 - 请注意,正在运行的任务数量随着时间的推移而减少,直到几乎达到零,然后突然跳回到 >1k 正在运行的任务:

[Stage 0:==============================>                 (17979 + 1070) / 28504]
[Stage 0:==============================>                 (18042 + 1019) / 28504]
[Stage 0:===============================>                 (18140 + 921) / 28504]
[Stage 0:===============================>                 (18222 + 842) / 28504]
[Stage 0:===============================>                 (18263 + 803) / 28504]
[Stage 0:===============================>                 (18282 + 786) / 28504]
[Stage 0:===============================>                 (18320 + 751) / 28504]
[Stage 0:===============================>                 (18566 + 508) / 28504]
[Stage 0:================================>                (18791 + 284) / 28504]
[Stage 0:================================>                (18897 + 176) / 28504]
[Stage 0:================================>                (18940 + 134) / 28504]
[Stage 0:================================>                (18972 + 107) / 28504]
[Stage 0:=================================>                (19035 + 47) / 28504]
[Stage 0:=================================>                (19067 + 17) / 28504]
[Stage 0:================================>               (19075 + 1070) / 28504]
[Stage 0:================================>               (19107 + 1039) / 28504]
[Stage 0:================================>                (19165 + 982) / 28504]
[Stage 0:=================================>               (19212 + 937) / 28504]
[Stage 0:=================================>               (19251 + 899) / 28504]
[Stage 0:=================================>               (19355 + 831) / 28504]
[Stage 0:=================================>               (19481 + 708) / 28504]

这就是阶段稳定运行时 stderr 的样子——正在运行的任务数量大致保持不变,因为新任务会在执行器完成之前的任务时分配给它们:

[Stage 1:===================>                            (11599 + 2043) / 28504]
[Stage 1:===================>                            (11620 + 2042) / 28504]
[Stage 1:===================>                            (11656 + 2044) / 28504]
[Stage 1:===================>                            (11692 + 2045) / 28504]
[Stage 1:===================>                            (11714 + 2045) / 28504]
[Stage 1:===================>                            (11741 + 2047) / 28504]
[Stage 1:===================>                            (11771 + 2047) / 28504]
[Stage 1:===================>                            (11818 + 2047) / 28504]

在什么情况下会发生这种情况,我该如何避免这种行为?

注意:我正在使用动态分配,但我很确定这与这个问题无关——例如,在不稳定时期,在 Spark 应用程序主 UI 中,我可以看到预期的执行器数量是“活动的”,但是没有运行“活动任务”。


当每个任务花费的时间非常少时,我在 Spark 中看到过这样的行为。由于某种原因,调度程序似乎假设作业将更快地完成,而无需额外的分配开销,因为每个任务都完成得如此之快。

有几点值得尝试:

  • Try .coalesce()减少分区的数量,以便每个分区需要更长的时间来运行(当然,这可能会导致洗牌步骤,并可能增加总体作业 时间,你必须实验)
  • 调整spark.locality.wait*设置here https://spark.apache.org/docs/latest/configuration.html#scheduling。如果每个任务花费的时间少于默认等待时间3s,那么调度程序可能只是试图保持现有插槽已满,而永远没有机会分配更多插槽。

我还没有追查到exactly是什么导致了这个问题,所以这些只是基于我自己在我自己的(小得多的)集群中的观察的猜测和预感。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么非常大的 Spark 阶段不使用所有可用的执行器? 的相关文章

  • 如何使用 Apache Livy 设置 Spark 配置属性?

    我不知道在向 Apache Livy 提交 Spark 作业时如何以编程方式传递 SparkSession 参数 这是测试 Spark 作业 class Test extends Job Int override def call jc J
  • Delta Lake 独立于 Apache Spark?

    我一直在探索数据湖屋概念和 Delta Lake 它的一些功能看起来真的很有趣 就在项目主页上https delta io https delta io 有一个图表显示 Delta Lake 运行在 您现有的数据湖 上 但没有提及 Spar
  • Python Spark DataFrame:用 SparseVector 替换 null

    在 Spark 中 我有以下名为 df 的数据框 其中包含一些空条目 id features1 features2 185 5 0 1 4 0 1 0 null 220 5 0 2 3 0 1 0 10 1 2 6 0 1 225 null
  • 异常:java.lang.Exception:使用 master 'yarn' 运行时,必须在环境中设置 HADOOP_CONF_DIR 或 YARN_CONF_DIR。在火花中

    我是新的阿帕奇火花 我已经在spark独立模式下测试了一些应用程序 但我想运行应用程序yarn模式 我正在windows中运行apache spark 2 1 0 这是我的代码 c spark gt spark submit2 master
  • 如何更改 SparkContext.sparkUser() 设置(在 pyspark 中)?

    我是新来的Spark and pyspark 我使用 pyspark 之后我rdd处理中 我试图将其保存到hdfs使用saveAsTextfile 功能 但我得到一个 没有权限 错误消息 因为 pyspark 尝试写入hdfs使用我的本地帐
  • 如何过滤 pyspark 列表中值的列?

    我有一个数据框原始数据 我必须在 X 列上应用值 CB CI 和 CR 的过滤条件 所以我使用了下面的代码 df dfRawData filter col X between CB CI CR 但我收到以下错误 Between 恰好需要 3
  • 获取 emr-ddb-hadoop.jar 将 DynamoDB 与 EMR Spark 连接

    我有一个 DynamoDB 表 需要将其连接到 EMR Spark SQL 才能对该表运行查询 我获得了带有发行标签 emr 4 6 0 和 Spark 1 6 1 的 EMR Spark Cluster 我指的是文档 使用 Spark 分
  • Spark 执行器 STDOUT 到 Kubernetes STDOUT

    我在 Spark Worker 中运行的 Spark 应用程序将执行程序日志输出到特定文件路径 worker home directory app xxxxxxxx 0 stdout I used log4j properties将日志从
  • 从 pandas udf 记录

    我正在尝试从 python 转换中调用的 pandas udf 进行日志记录 因为在执行器上调用的代码不会显示在驱动程序的日志中 我一直在寻找一些选项 但到目前为止最接近的选项是这个one https stackoverflow com q
  • 列对象不可调用 Spark

    我尝试安装 Spark 并运行教程中给出的命令 但出现以下错误 https spark apache org docs latest quick start html https spark apache org docs latest q
  • Kubernetes WatchConnectionManager:执行失败:HTTP 403

    我遇到错误Expected HTTP 101 response but was 403 Forbidden 在我使用以下命令设置新的 Kubernetes 集群之后Kubeadm当我提交下面遇到的 pyspark 示例应用程序时 只有一个主
  • 如何从spark管道逻辑模型中提取变量权重?

    我目前正在尝试学习 Spark Pipeline Spark 1 6 0 我将数据集 训练和测试 导入为 oas sql DataFrame 对象 执行以下代码后 生成的模型是oas ml tuning CrossValidatorMode
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • 过滤字符串上的 Spark DataFrame 包含

    我在用火花1 3 0 http spark apache org releases spark release 1 3 0 html and 火花阿夫罗1 0 0 https github com databricks spark avro
  • 更改 Spark SQL 中的 Null 顺序

    我需要能够按升序和降序对列进行排序 并且还允许空值位于第一个或空值位于最后一个 使用 RDD 我可以将 sortByKey 方法与自定义比较器结合使用 我想知道是否有使用 Dataset API 的相应方法 我了解如何将 desc asc
  • Spark SQL 失败,因为“常量池已超过 JVM 限制 0xFFFF”

    我在 EMR 4 6 0 Spark 1 6 1 上运行此代码 val sqlContext SQLContext getOrCreate sc val inputRDD sqlContext read json input try inp
  • 如何将包含多个字段的大型 csv 加载到 Spark

    新年快乐 我知道以前曾提出 回答过此类类似的问题 但是 我的问题有所不同 我有大尺寸的 csv 有 100 个字段和 100MB 我想将其加载到 Spark 1 6 进行分析 csv 的标题看起来像附件sample http www roc
  • 火花内存不足

    我有一个文件夹 里面有 150 G 的 txt 文件 大约 700 个文件 平均每个 200 MB 我使用 scala 来处理文件并最终计算一些汇总统计数据 我认为有两种可能的方法可以做到这一点 手动循环所有文件 对每个文件进行计算并最终合
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr

随机推荐

  • 如何停止反应原生动画

    我试图在本机反应中停止动画 但它不起作用 我尝试这样做停止动画方法 https facebook github io react native docs animated html stopanimation 这是我的代码 construc
  • 如何使用 Spark 查找中位数和分位数

    我怎样才能找到一个中位数RDD使用分布式方法 IPython 和 Spark 计算整数 这RDD大约有 700 000 个元素 因此太大而无法收集和查找中位数 这个问题与这个问题类似 如何使用 Apache Spark 计算精确中位数 ht
  • 如何在新窗口中打开链接?

    我有一个特定链接的点击处理程序 在其中我想做类似以下的事情 window location url 我需要这个才能在新窗口中实际打开网址 我该怎么做 您可以喜欢 window open url window name window sett
  • Java 并行流的性能影响

    使用的最佳实践是什么 stream parallel 例如 如果您有一堆阻塞 I O 调用 并且您想要检查是否 anyMatch 并行执行此操作似乎是明智的做法 示例代码 public boolean hasAnyRecentReferen
  • 在 C# 中删除继承对象的属性

    如果我有一个复杂的对象 我可以继承它并remove or ignore某些属性 如果你不在乎why我想这样做 请随意提交答案 如果您关心的话 可以阅读这个问题 https stackoverflow com questions 860786
  • 无法让 php exec 工作

    我已经为此奋斗了几个小时 但似乎无法解决 尝试了 exec shell exec 和 system 什么都不起作用 我有这个 exec usr bin php var www vhosts domain com httpdocs shell
  • 机器模式下mret和ret指令有什么区别?

    当RISC V核心工作在机器模式时 mret和ret指令有什么区别吗 ret is a pseudoinstruction which actually is a jalr instruction while mret is a real
  • 如何实例化相互依赖的类?

    我有一个PlayoffCreator类来创建季后赛比赛 这个类有一个Bracket生成括号结构的实例 该结构中的每个匹配都是一个Node 由该类的两个实例组成Element 然后PlayoffCreator经历每一个Node the Bra
  • 如何向pandas工具包代理添加会话记忆?

    我想添加一个ConversationBufferMemory to pandas dataframe agent但到目前为止我还没有成功 我尝试通过构造函数添加内存 create pandas dataframe agent llm df
  • HDFS 错误:只能复制到 0 个节点,而不是 1 个

    我在 EC2 中创建了一个 ubuntu 单节点 hadoop 集群 测试将简单文件上传到 hdfs 可以在 EC2 计算机上进行 但不能在 EC2 外部的计算机上进行 我可以通过远程计算机的 Web 界面浏览文件系统 它显示一个报告为正在
  • 我如何解码这个恶意软件 PHP 脚本? [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 我只是注意
  • 文本未出现在 XTS 图上

    我在使用 R 向时间序列数据图中添加一些文本时遇到问题xts 我已经制作了一个简单的问题示例 My text 命令似乎什么也没做 而我可以在图中添加一个点 我尝试尽可能使用默认值来保持代码简单 require quantmod fetch
  • intentService :为什么我的 onHandleIntent 从未被调用?

    我正在使用 android xml rpc 来安装服务器 为此 我正在使用intentService 唯一的问题是 当启动服务器类时 我的包含服务器的 onHandleIntent 永远不会被调用 我做了一些研究 发现有人有同样的问题 他设
  • 为 sklearn 梯度增强分类器设置自定义损失

    Sklearn 梯度增强分类器接受偏差和指数损失 详见here https scikit learn org stable modules ensemble html gradient boosting and here https sci
  • Java 性能技巧

    我有一个程序从 C 移植到 Java 这两个应用程序都使用快速排序来排序一些分区数据 基因组坐标 Java 版本运行速度很快 但我想让它更接近 C 版本 我使用的是 Sun JDK v6u14 显然 我无法与 C 应用程序相媲美 但我想了解
  • 如何在 python 中合并列表? [复制]

    这个问题在这里已经有答案了 我有 2 个列表 例如 1 2 3 和 4 5 6 如何将它们合并到 1 个新列表中 1 2 3 4 5 6 不是 1 2 3 4 5 6 运算符可用于合并两个列表 data1 1 2 3 data2 4 5 6
  • Angular 2 下拉菜单中的 bootstrap 4 不起作用

    我按照以下步骤将 bootstrap 4 安装到我的 Angular 2 项目中 接受答案 遵循前 1 2 3 和 4 个步骤 https stackoverflow com questions 37649164 how to add bo
  • 在 SpriteKit 中预加载纹理

    我做了一些研究 但似乎找不到任何可以清楚解释如何在动画中预加载单个纹理和纹理的内容 我目前正在使用Atlas s in Assets xcassets对相关的动画图像进行分组 我的图像出现在 Atlas 中是否意味着它们已被预加载 就单个图
  • Bootstrap 轮播图像未以全宽显示

    我正在创造一个简单的 HTML 模板 http zulhfreelancer com projects omar 使用Bootstrap 3 我在中等尺寸显示中检查模板 看起来不错 但是当我在更大的显示器 和更高分辨率 中检查它时 我在轮播
  • 为什么非常大的 Spark 阶段不使用所有可用的执行器?

    我正在运行一个包含一些非常大的阶段 例如 gt 20k 任务 的 Spark 作业 并使用 1k 到 2k 执行器运行它 在某些情况下 阶段似乎运行不稳定 随着时间的推移 许多可用的执行器变得空闲 尽管仍然处于有许多未完成任务的阶段中间 从