执行许多数据帧连接时出现 PySpark OutOfMemoryErrors

2023-12-31

关于这个问题的帖子很多,但没有一个回答我的问题。

我遇到了OutOfMemoryError在 PySpark 中尝试将许多不同的数据帧连接在一起。

我的本地机器有 16GB 内存,我的 Spark 配置如下:

class SparkRawConsumer:

    def __init__(self, filename, reference_date, FILM_DATA):
        self.sparkContext = SparkContext(master='local[*]', appName='my_app')
        SparkContext.setSystemProperty('spark.executor.memory', '3g')
        SparkContext.setSystemProperty('spark.driver.memory', '15g')

显然有很多很多关于 Spark 中 OOM 错误的帖子,但基本上大多数都说要增加你的内存属性。

我本质上是从 50-60 个较小的数据帧执行连接,这些数据帧有两列uid, and data_in_the_form_of_lists(通常,它是 Python 字符串的列表)。我要加入的主数据框有大约 10 列,但还包含uid专栏(我正在加入)。

我只尝试连接 1,500 行数据。但是,当显然所有这些数据都可以放入内存时,我会频繁遇到 OutOfMemory 错误。我通过查看存储中的 SparkUI 来确认这一点:

在代码中,我的连接如下所示:

# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
                on="gid_value")

metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
                on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")

Where metric1, metric2 and metric3是我在连接之前转换为数据帧的 RDD(请记住,实际上有 50 个较小的 RDD)metric我正在加入 dfs)。

I call metric.count()强制评估,因为它似乎有助于防止内存错误(否则在尝试最终收集时我会遇到更多驱动程序错误)。

这些错误是不确定的。我没有看到它们始终出现在我的连接中的任何特定位置,有时似乎出现在我的最后一个位置metrics_df.collect()调用,有时在较小的连接期间。

我真的怀疑任务序列化/反序列化存在一些问题。例如,当我查看典型阶段的事件时间线时,我发现其中大部分由任务反序列化占用:

我还注意到垃圾收集时间很大:

垃圾收集是导致内存错误的问题吗?还是任务序列化?

编辑回答评论问题

我一直在将 Spark 作业作为更大的 PyCharm 项目的一部分来运行(因此 Spark 上下文被包裹在一个类中)。我使用以下 Spark 提交重构了代码以将其作为脚本运行:

spark-submit spark_consumer.py \
  --driver-memory=10G \
  --executor-memory=5G \
  --conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'

我遇到了类似的问题并且它适用于:
火花提交:

spark-submit --driver-memory 3g\
            --executor-memory 14g\
            *.py

Code:

sc = SparkContext().getOrCreate()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

执行许多数据帧连接时出现 PySpark OutOfMemoryErrors 的相关文章

随机推荐

  • crossfilter中的reduceAdd、reduceSum、reduceRemove函数是什么?应该如何使用它们?

    有人可以用简单的术语解释一下reduce函数如何及其参数reduceAdd reduceSum reduceRemove工作于crossfilter 请记住 映射缩减通过特定维度的键来缩减数据集 例如 让我们使用带有记录的交叉过滤器实例 n
  • 通过python上传JSON到谷歌云存储

    我正在尝试上传 JSON 我必须到谷歌云存储 我可以手动执行此操作 因此我知道它可以工作 但现在想编写一个自动执行此操作的 python 脚本 import boto import gcs oauth2 boto plugin import
  • 具有自定义 JAX-B 绑定的 JAX-WS MarshalException:无法将类型“java.lang.String”封送为元素

    我似乎对 Jax WS 和 Jax b 协同工作有疑问 我需要使用一个具有预定义 WSDL 的 Web 服务 执行生成的客户端时 我收到以下错误 javax xml ws WebServiceException javax xml bind
  • Visual Studio 实体框架向导在 MySQL 上崩溃[重复]

    这个问题在这里已经有答案了 在使用实体框架向导对 MySQL 连接执行任何操作期 间 它会在第二页上崩溃而不会出现任何错误 问题与中相同实体框架向导在 MySQL 上崩溃 https stackoverflow com questions
  • Git-SVN 清除身份验证缓存

    如何让 git svn 忘记 svn 身份验证详细信息 我们有一台运行 Windows Server 2008 的配对机器 在该机器上有一个 git 存储库 并且我们签入到中央 subversion 存储库 我希望 git 在每次签入时提示
  • 嵌套目录中的 Symfony 2 项目

    我需要在生产服务器上的嵌套目录中部署 Symfony 2 项目 实际上 这意味着所有 URL 都以 subdirectory 路径为前缀 即 http host com subdirectory project web app php su
  • 导入错误:没有名为 xlwt 的模块

    我的系统 Windows Python 2 7 我下载了一个包并想将其包含在我的脚本中 解压包后 这是我的文件夹结构 Work xlwt 0 7 3 contains a setup py xlwt 包含 init py除其他外 我的脚本从
  • 如何从Python日期时间对象中删除秒? [复制]

    这个问题在这里已经有答案了 我有一个 python 日期时间对象 我想在网站上显示它 但是时间以 hh mm ss 格式显示 我想以 hh mm 格式显示它 我已尝试按照以下方式使用替换方法 message timestamp replac
  • 使用带有附加属性的“styled()”MUI 系统实用程序 (Typescript)

    我正在使用 MUI System v5 开发一个新项目 我在用着styled 这里的实用程序 不是样式组件 用于设计和创建简单的 UI 组件 该项目采用 TypeScript 我现在有很多困难 因为我不知道是否以及如何将道具传递给这些组件
  • jquery 中的 .clone() 方法不复制值[重复]

    这个问题在这里已经有答案了 可能的重复 没有内容的文本框的 Jquery 克隆 https stackoverflow com questions 4366159 jquery clone of a textbox without the
  • 用于将文本复制到剪贴板的独立于平台的工具

    我正在尝试编写一个函数将字符串参数复制到剪贴板 我打算在我一直在编写的 Python 脚本中使用它 这是我到目前为止所拥有的 在另一个堆栈溢出帖子中找到了大部分此片段 from tkinter import Tk def copy to c
  • 用Python在文件中间插入行?

    有没有办法做到这一点 假设我有一个文件 其中包含如下名称列表 Alfred Bill Donald 我如何在第 x 行 本例中为 3 插入第三个名字 Charlie 并自动将所有其他名字发送到一行 我见过其他类似的问题 但没有得到有用的答案
  • PUT 和 DELETE HTTP 请求方法有什么用处?

    我从未使用过 PUT 或 DELETE HTTP 请求方法 我的倾向是 当系统 我的应用程序或网站 的状态可能不受影响 如产品列表 时使用 GET 而当系统状态 如下订单 受到影响时 我倾向于使用 POST 这两个不是总是足够的 还是我错过
  • 布尔玛旋转木马没有响应

    我正在尝试将 bulma carousel 合并到我的 React 应用程序中 但它似乎不起作用 我尝试使用它来实现它布尔玛旋转木马 https wikiki github io components carousel 这个文档也是如此 但
  • Blazor 服务器客户端中的引导工具提示问题

    I am trying to get the formatting right for the tooltips but i cant figure out how to The code below works perfectly
  • 在未安装 Tensorflow 的情况下运行 Tensorflow 模型

    我有一个运行良好的 TF 模型 是用 Python 和 TFlearn 构建的 有没有办法在另一个系统上运行这个模型而不需要安装 Tensorflow 它已经经过预先训练 所以我只需要通过它运行数据即可 我知道 tfcompile 在这里发
  • QLineEdit python 方式大写输入

    我使用 QT Designer 绘制了一个 UI 但发现没有参数可供我将 QLineEdit 输入设置为大写 经过一些在线搜索后 我只看到了极少数满足我需求的结果 但所有结果都是用 Qt 编写的 例如 这个link http www qtf
  • Spring Initializr 项目导致不支持的类文件主要版本 64

    当我使用创建一个新项目时弹簧初始化 https start spring io Gradle 不会构建该项目 我使用 IntelliJ IDEA 错误信息是 Exception is org gradle cache CacheOpenEx
  • 从 Scipy 稀疏矩阵中获取唯一行

    我正在 python 中处理稀疏矩阵 我想知道是否有一种有效的方法来删除稀疏矩阵中的重复行 并且只保留唯一的行 我没有找到与之相关的函数 并且不知道如何在不将稀疏矩阵转换为密集矩阵并使用 numpy unique 的情况下执行此操作 没有快
  • 执行许多数据帧连接时出现 PySpark OutOfMemoryErrors

    关于这个问题的帖子很多 但没有一个回答我的问题 我遇到了OutOfMemoryError在 PySpark 中尝试将许多不同的数据帧连接在一起 我的本地机器有 16GB 内存 我的 Spark 配置如下 class SparkRawCons