对于小数据,Spark shuffle 读取需要花费大量时间

2024-02-15

我们正在运行以下阶段的 DAG,并且对于相对较小的 shuffle 数据大小(每个任务大约 19MB),经历了较长的 shuffle 读取时间

一个有趣的方面是每个执行器/服务器内的等待任务具有相同的随机读取时间。以下是其含义的示例:对于以下服务器,一组任务等待大约 7.7 分钟,另一组任务等待大约 26 秒。

这是同一阶段运行的另一个示例。该图显示了 3 个执行器/服务器,每个执行器/服务器都有统一的任务组,并且具有相同的随机读取时间。蓝色组代表由于推测执行而被终止的任务:

并不是所有的执行者都是这样。有些任务几乎均匀地在几秒钟内完成所有任务,并且这些任务的远程读取数据大小与在其他服务器上等待很长时间的任务相同。 此外,这种类型的阶段在我们的应用程序运行时运行两次。产生这些具有大量随机读取时间的任务组的服务器/执行器在每个阶段运行中都是不同的。

以下是其中一台服务器/主机的任务统计表示例:

看起来负责这个 DAG 的代码如下:

output.write.parquet("output.parquet")
comparison.write.parquet("comparison.parquet")
output.union(comparison).write.parquet("output_comparison.parquet")
val comparison = data.union(output).except(data.intersect(output)).cache()
comparison.filter(_.abc != "M").count()

我们非常感谢您对此的想法。


显然,问题出在 JVM 垃圾收集 (GC) 上。这些任务必须等待远程执行器上的 GC 完成。等效的随机读取时间是由于多个任务正在等待执行 GC 的单个远程主机这一事实造成的。我们遵循了发布的建议here https://stackoverflow.com/questions/38981772/spark-shuffle-operation-leading-to-long-gc-pause/39111205并且问题减少了一个数量级。远程主机上的 GC 时间和本地 shuffle 读取时间之间仍然存在很小的相关性。未来我们想尝试shuffle服务。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

对于小数据,Spark shuffle 读取需要花费大量时间 的相关文章

  • 你能在 scala 中使用 varargs 柯里化一个函数吗?

    我正在考虑如何用可变参数柯里化一种方法 然后我意识到我什至不知道如何去做 理想情况下 它应该让您可以随时开始使用它 然后以可迭代结束 def concat strs String strs mkString val curriedConca
  • 使用什么框架来引导我的第一个生产 scala 项目?

    我正在第一次涉足 scala 的生产应用程序 该应用程序当前打包为 war 文件 我的计划是创建 scala 编译工件的 jar 文件 并将其添加到 war 文件的 lib 文件夹中 我的增强功能是通过 Jersey 公开的 mysql 支
  • PySpark - 系统找不到指定的路径

    Hy 我已经多次运行 Spark Spyder IDE 今天我收到这个错误 代码是相同的 from py4j java gateway import JavaGateway gateway JavaGateway os environ SP
  • 在泛型方法中返回原始集合类型

    假设我们想要创建一个像这样的函数minBy返回集合中同等极简主义的所有元素 def multiMinBy A B Ordering xs Traversable A f A gt B val minVal f xs minBy f xs f
  • 如何将 Spark DataFrame 以 csv 格式保存在磁盘上?

    例如 这样的结果 df filter project en select title count groupBy title sum 将返回一个数组 如何将 Spark DataFrame 作为 csv 文件保存在磁盘上 Apache Sp
  • 如何使用 zio-test 测试异常情况

    我有以下功能 我想测试 def people id Int RIO R People 如果有 People 则此函数返回 Peopleid 分别 如果没有则失败 例如 IO fail ServiceException s No People
  • “为 Apache Hadoop 2.7 及更高版本预构建”是什么意思?

    Apache Spark 下载页面上的 pre built for Apache Hadoop 2 7 and later 是什么意思 这是否意味着spark中HDFS必须有库 如果是这样 其他存储系统 例如 Cassandra s3 HB
  • SBT Scaladoc 配置

    我正在尝试在 SBT 中配置 Scaladoc 特别是标题 输出目录和类路径 我通过将以下内容添加到 build sbt 来定义标题 scalacOptions in Compile doc Opts doc title Scala Too
  • 对于 Scala,“无全局类型推断”是什么意思?

    我读过 Scala 的类型推断不是全局的 因此人们必须在方法上放置类型注释 这会是 本地 类型推断吗 我只知道一点点 原因是它面向对象的本质 但我不清楚 是否有 全局类型推断 的解释以及为什么 Scala 不能让初学者可以理解 The pr
  • DataFrame 分区到单个 Parquet 文件(每个分区)

    我想重新分区 合并我的数据 以便将其保存到每个分区的一个 Parquet 文件中 我还想使用 Spark SQL partitionBy API 所以我可以这样做 df coalesce 1 write partitionBy entity
  • 使用 Reader Monad 进行依赖注入

    我最近看到了谈话极其简单的依赖注入 http www youtube com watch v ZasXwtTRkio and 无需体操的依赖注入 http vimeo com 44502327关于 Monads 的 DI 并留下了深刻的印象
  • 如何删除spark输出中的compactbuffer

    下面是我在spark shell中运行的程序 但是当我将输出保存在HDFS中时 我得到带有compactbuffer的输出 如何删除spark输出中的compactbuffer Program val a sc textFile datag
  • Spark 对 RDD 中按值排序

    我有一个火花对 RDD 键 计数 如下 Array String Int Array a 1 b 2 c 1 d 3 使用spark scala API如何获取按值排序的新RDD对 所需结果 Array d 3 b 2 a 1 c 1 这应
  • 运行pyspark时没有这样的文件或目录错误

    我安装了 Spark 但是当我运行时pyspark在终端上 我得到 usr local Cellar apache spark 2 4 5 1 libexec bin pyspark line 24 Users miguel spark 2
  • 在 Jupyter 笔记本中使用 PySpark 读取 XML

    我正在尝试读取 XML 文件 df spark read format com databricks spark xml load path to my xml 并收到以下错误 java lang ClassNotFoundExceptio
  • 无法证明与路径相关类型的等价性

    为什么最后一个summon编译失败 我该怎么做才能让它编译 import java time LocalDateTime LocalTime trait Circular T type Parent given localTimeCircu
  • Scala 如何使用我的所有核心?

    object PrefixScan sealed abstract class Tree A case class Leaf A a A extends Tree A case class Node A l Tree A r Tree A
  • 强制类型差异

    在 Scala 中 我可以在编译时强制执行类型相等 例如 case class Foo A B a A b B implicit ev A B scala gt Foo 1 2 res3 Foo Int Int Foo 1 2 scala
  • Scala:var List 与 val MutableList

    在 Odersky 等人的 Scala 书中 他们说使用列表 我还没有从头到尾读过这本书 但所有的例子似乎都使用了 val List 据我了解 还鼓励人们使用 vals 而不是 vars 但在大多数应用程序中 使用 var List 或 v
  • 源值 1.5 的错误已过时,将在未来版本中删除

    我使用 scala maven plugin 来编译包含 scala 和 java 代码的项目 我已经将源和目标设置为1 7 但不知道为什么maven仍然使用1 5 这是我在 pom xml 中的插件

随机推荐

  • Git 挑选那些包含关键字(跟踪 ID)的提交

    出于代码审查的目的 我想 樱桃选择特定提交 与他们一起创建一个新分支并 将该分支推送到远程 这样我就可以将分支 url 提供给同行进行审核 我想创建一个 shell 脚本并发出简单的命令 例如 git review
  • 无法在 sqlfiddle (oracle) 上的选择查询中使用列名

    如果我错误地使用 sqlfidle 或者缺少功能 我不会 重现步骤 选择oracle选项 左上角 创建表并插入数据 CREATE TABLE products P Id int ProductName varchar2 10 UnitPri
  • 如何测试 img 标签是否存在?

    if I do expect img not toBe null 然后我得到一个错误 Error expect called with WebElement argment expected a Promise Did you mean t
  • 从 Azure 中的图像调整器请求接收到 400 错误请求

    ImageResizer 作为 Azure 虚拟应用程序运行 在 ImageResizer 修补程序之后 这在 Azure 模拟中运行良好 但在 Azure 云中遇到问题 如果未指定查询字符串参数并且 URL 重定向到 Blob 存储 则图
  • 解释一下这个 Kotlin 函数结构

    我正在使用这个 Kotlin 函数 我知道我们有一个函数叫做mPasswordView setOnEditorActionListener 采用参数TextView OnEditorActionListener 但是后面是什么 我们的参数里
  • 来自 cURL 请求的 RestSharp POST 请求翻译

    我正在尝试使用 RestSharp 发出 POST 请求以在 JIRA 中创建问题 而我必须使用的是一个使用 cURL 的示例 我对这两者都不够熟悉 不知道我做错了什么 这是example https developer atlassian
  • 使用 mongod 以 utc 存储日期时如何处理时区问题?

    我有一个 mongodb 集合 其中每个文档都有一些属性和 utc 时间戳 我需要从集合中提取数据并使用聚合框架 因为我使用集合中的数据在用户界面上显示一些图表 但是 我需要根据用户的时区进行聚合 假设我知道用户的时区 从浏览器的请求或以其
  • 使用 Python 解析文件 (ics/icalendar)

    我有一个以下格式的 ics 文件 解析它的最佳方法是什么 我需要检索每个条目的摘要 描述和时间 BEGIN VCALENDAR X LOTUS CHARSET UTF 8 VERSION 2 0 PRODID Lotus Developme
  • XSD 中复杂类型的选择

    在 XML 模式中 复杂类型可以包含其他复杂类型的选择吗 我找不到任何相关信息 是的 请参阅第3 4 2节 http www w3 org TR xmlschema 1 declare typeXML 模式参考 AcomplexType确实
  • Chrome Mobile 108:touchcancel 未触发

    当触摸被取消时 如何让处理程序运行 例如 我可以取消突出显示按钮当 CSS 不能帮我做这件事时 https stackoverflow com questions 74843213 chrome mobile javascript even
  • 将元素附加到matlab中的单元格

    为了将 类似 python 的追加 元素添加到 Matlab 列表中 我使用以下迭代 list element1 element2 list 如果列表元素是数字或字符 它就可以正常工作 但如果我尝试将它们结合起来 它就不起作用 我希望将 2
  • 使用OpenID登录多个域:这个方案可行吗?

    例如 我们正在两个域上运行两个社区站点 称它们为example com and example net 我们希望以后能够将其扩展到更多领域 我们希望允许多种类型的登录 OpenID Facebook Twitter 标准用户名 密码 我们希
  • ASP.NET MVC5 应用程序在授权时抛出 NullReferenceException

    我有一个 MVC5 应用程序 它在生产服务器使用时 Authorize 控制器上的属性 该应用程序正在使用表单身份验证 生产服务器是 Server 2008 SP 2 NET 4 5 1 和 IIS 7 堆栈跟踪的开头是 NullRefer
  • 如何为 firebase 数据编写完成处理程序?

    因此 我之前在使用 firebase 的 观察 时遇到了问题 并且我意识到我无法从异步工作的代码块内部引入变量值 一位用户告诉我使用完成处理程序来解决此问题 他的示例是 func mapRegion completion MKCoordin
  • JPA 当前没有活动的交易

    将 JPA 与 EclipseLink 实现结合使用 Code try if em getTransaction isActive em getTransaction begin System out println 2 em persis
  • Azure Databricks 中 DBFS 的数据大小限制是多少

    I read here https forums databricks com questions 8331 is there a size limit on files i put into dbfs fil htmlAWS Databr
  • websocket._exceptions.WebSocketProxyException:通过代理连接失败状态:503

    提供的答案需要一些有关 qlik 服务器身份验证的更多详细信息 我正在尝试连接到qlik通过 WebSocket 使用证书 Error websocket exceptions WebSocketProxyException failed
  • 如何在 Windows 10 邮件应用程序中打开带有附件的新电子邮件

    我正在尝试向我的 C Net 应用程序添加一项功能 以便用户通过电子邮件发送文件 当用户安装了 Outlook 后 我可以成功使用 Outlook 互操作 API 来完成我想要的操作 然而 在新安装的 Windows 10 中 我无法弄清楚
  • 为什么 .NET 使用银行舍入作为默认值?

    根据文档 decimal Round http msdn microsoft com en us library zy06z30k aspx方法使用舍入到偶数算法 这对于大多数应用程序来说并不常见 因此 我总是最终编写一个自定义函数来执行更
  • 对于小数据,Spark shuffle 读取需要花费大量时间

    我们正在运行以下阶段的 DAG 并且对于相对较小的 shuffle 数据大小 每个任务大约 19MB 经历了较长的 shuffle 读取时间 一个有趣的方面是每个执行器 服务器内的等待任务具有相同的随机读取时间 以下是其含义的示例 对于以下