Spark 2.1.0中读取大文件时出现内存不足错误

2024-01-02

我想使用 Spark 将大型(51GB)XML 文件(在外部 HDD 上)读取到数据帧中(使用Spark-XML 插件 https://github.com/databricks/spark-xml),进行简单的映射/过滤,重新排序,然后将其作为 CSV 文件写回磁盘。

但我总是得到一个java.lang.OutOfMemoryError: Java heap space不管我如何调整这个。

我想了解为什么增加分区数量不能阻止 OOM 错误

难道它不应该将任务分成更多部分,以便每个部分都更小并且不会导致内存问题吗?

(Spark 不可能尝试将所有内容都填充到内存中,如果不合适就会崩溃,对吗?)

我尝试过的事情:

  • 读取和写入时对数据帧重新分区/合并(5,000 和 10,000 个分区)(初始值为 1,604)
  • 使用较少数量的执行器(6、4,甚至2执行者我收到 OOM 错误!)
  • 减小分割文件的大小(默认为 33MB)
  • 提供大量内存(我拥有的全部)
  • 增加spark.memory.fraction至 0.8(默认为 0.6)
  • 减少spark.memory.storageFraction至 0.2(默认为 0.5)
  • set spark.default.parallelism到 30 和 40(我的默认值是 8)
  • set spark.files.maxPartitionBytes至 64M(默认为 128M)

我的所有代码都在这里(请注意,我没有缓存任何内容):

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined previously
  .option("rowTag", "row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( df2.col("_TypeId") === "1" )
  .select(
    df("_Id").as("id"),
    df("_Title").as("title"),
    df("_Body").as("body"),
  ).as[Post]

// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r


// more mapping
dsout
 .map{
  case Post(id,title,body,tags) =>

    val body1 = tagPat.replaceAllIn(body,"")
    val body2 = whitespacePat.replaceAllIn(body1," ")

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)

NOTES

  • 输入拆分非常小(仅 33MB),那么为什么我不能让 8 个线程每个处理一个拆分呢?它真的不应该破坏我的记忆(我已经知道

UPDATE我写了一个较短的版本只读取文件然后 forEachPartition(println) 的代码。

我遇到同样的 OOM 错误:

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag", "row")
  .load(s"$pathToInputXML")
  .repartition(numPartitions)

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")

df
  .where(df.col("_PostTypeId") === "1")
  .select(
   df("_Id").as("id"),
   df("_Title").as("title"),
   df("_Body").as("body"),
   df("_Tags").as("tags")
  ).as[Post]
  .map {
    case Post(id, title, body, tags) =>
      Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
  }
  .foreachPartition { rdd =>
    if (rdd.nonEmpty) {
      println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
    }
  }

P.S.:我使用的是 Spark v 2.1.0。我的机器有 8 核和 16 GB 内存。


我在运行 Spark-shell 时遇到此错误,因此我将驱动程序内存增加到一个很高的数字。然后我就可以加载 XML 了。

spark-shell --driver-memory 6G

Source: https://github.com/lintool/warcbase/issues/246#issuecomment-249272263 https://github.com/lintool/warcbase/issues/246#issuecomment-249272263

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 2.1.0中读取大文件时出现内存不足错误 的相关文章

随机推荐

  • Django Test 模拟 ImageField 阻止上传或测试后清理

    我正在为我的项目开发一个测试用例 涉及一些图像和文件字段 有些对象加载了固定装置 有些对象是在测试期间创建的 我用 InMemoryUploadedFile 模拟了我的图像 创建新的测试对象时 正在上传模拟图像 到 MEDIA ROOT 目
  • 如果你的基类有一个虚拟析构函数,你自己的析构函数自动是虚拟的

    我知道标题的说法是正确的 那么常规函数呢 例如 class Father virtual void foo class Son public Father void foo class GrandSon public Son void fo
  • 让 Karma、6to5ify 和 Istanbul 合作

    我有 Browserify 6to5ify 和 Karma 可以很好地发挥作用 成功运行我的规范 然而 当我添加代码覆盖率时 事情就变糟了 我尝试了几种方法 Add browserify istanbul转换为我的 karma conf j
  • 如果我想添加类型化属性,子类化 NSNotification 是正确的途径吗?

    我正在尝试子类化NSNotification Apple 的文档NSNotification陈述以下内容 NSNotification是一个没有实例变量的类簇 像这样 你必须子类化NSNotification并重写原始方法name obje
  • 无法选择 id=":1" 的 div

    对网络东西有点菜鸟 但我有一个带有此标签的 div div class 我已经测试过我的 jQuery 可以正常工作 当前使用版本 2 1 3 我已经测试了许多其他关于选择器中冒号的 SO 帖子中推荐的选择器 然后是其他一些 我已经分别尝试
  • ruby - 如何在 minitest 示例中使用标签

    I have require minitest spec require minitest autorun require minitest tags require rspec expectations describe One happ
  • 在一行中多次使用相同模式的正则表达式

    我正在寻找的模式是这样的 TXT txt 该模式可以在任何给定行中多次出现 我想要么提取模式的每个实例 要么使用 sed 或其他任何东西 删除每个实例周围的文本 Thanks 您可以将 Perl 用作 cat file foo TXT1 t
  • 如何在 Xcode 7 中使用对象库的堆栈视图

    我最近安装了Xcode 7 测试版并发现了一些东西new在对象库中 例如 水平堆栈视图和垂直堆栈视图 当我在其中放入一些控件时 stackview 似乎可以调整大小 根据控制尺寸 当我呈现更多控制堆栈时 似乎从堆栈视图开始自动调整 任何一个
  • 获取os.Error值的一种方法——字符串值(Go)

    如何获取字符串值os Error 也就是说 分配给一个变量 例如 package main import errors fmt func main err errors New an error message s err Error fm
  • Laravel: array_merge(): 参数 #2 不是数组错误

    异常开始出现在所有视图中 当我尝试运行时composer update 它总是以 error type ErrorException message array merge Argument 2 is not an array file l
  • ArrayList#size() 大于对象的实际数量

    我的 Android 应用程序中有一个 ArrayList 其中有 2 个项目 然而 它的 size 方法返回 3 我知道这听起来非常简单和愚蠢 但它是这样的 为什么会出现这种情况呢 ArrayList 的大小如何返回错误的数字 当我迭代列
  • 无需数据库即可存储数据?

    如果我想存储电子邮件 但没有数据库 例如 MySQL 我该怎么办 数据应该可以从 PHP 访问和写入 但常规 访问者 不得看到该数据 希望你能帮忙 您可以将它们放入文件中 data Defined somewhere file put co
  • 优先 Web SDK 与 REST API

    使用 Priority 的新工具开发网站时 在哪些情况下使用 Web SDK 访问 Priority 会比使用 REST API 更好 反之亦然又如何呢 这取决于您的需求 REST API 主要用于与其他应用程序集成 而 WEB SDK 用
  • 在 Mercurial 中指定点作为修订版

    我发现一些 Mercurial 命令的修订版被指定为点 例如 hg revert all r hg update C r hg pull r 这个点的含义是什么 它对应哪个版本 hg help revisions says 保留名称 指示工
  • 在 AngularJS 应用程序中预填充远程数据的最佳方法

    在我的 AngularJS 应用程序中 我需要从远程 REST 端点检索多个静态数据集合 这些数据集合将作为静态查找列表在整个应用程序生命周期中使用 我希望所有这些列表都在初始应用程序启动时填充 并保留并可供多个控制器使用 我不想动态加载任
  • 如何导出驻留在给定 Domino 服务器上的数据库列表?

    我有一台 Lotus Domino 服务器 上面有数量惊人的 Domino 数据库 排列在各个文件夹中 是否有某种方法可以以某种电子表格格式导出所有这些数据库及其标题和创建者姓名的列表 我拥有 Domino Admin 和 Domino D
  • 开发多个 Visual Studio 2010 扩展

    我正在单独的解决方案中开发几个独立的 Visual Studio 扩展 当我打开其中一个并在调试器中启动实验实例时 其他解决方案 未打开 的其他扩展也会被加载 必须有一种方法可以一次仅在实验实例中启动一个扩展 特别是您当前正在开发的扩展 对
  • Java中如何将字符串转换为函数?

    stackoverflow上有一个类似标题的问题here https stackoverflow com questions 41283897 how to convert string into math function just on
  • 如何在 Rust 中构建多工作区货物项目

    我有多工作空间 Cargo 项目 它有两个工作区 common and server common is a lib项目和服务器是bin项目 该项目在Github中的位置是here https github com rajcspsg mul
  • Spark 2.1.0中读取大文件时出现内存不足错误

    我想使用 Spark 将大型 51GB XML 文件 在外部 HDD 上 读取到数据帧中 使用Spark XML 插件 https github com databricks spark xml 进行简单的映射 过滤 重新排序 然后将其作为