Spark 作业在 YARN 模式下失败

2024-04-09

我有一个用 Scala 编写的 Spark 程序,它从 HDFS 读取 CSV 文件,计算新列并将其保存为 parquet 文件。我正在 YARN 集群中运行该程序。但每次我尝试启动它时,执行程序都会在某个时候失败并出现此错误。

您能帮我找出可能导致此错误的原因吗?

从执行者登录

16/10/27 15:58:10 WARN storage.BlockManager: Putting block rdd_12_225 failed due to an exception
16/10/27 15:58:10 WARN storage.BlockManager: Block rdd_12_225 could not be removed as it was not found on disk or in memory
16/10/27 15:58:10 ERROR executor.Executor: Exception in task 225.0 in stage 4.0 (TID 465)
java.io.IOException: Stream is corrupted
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
    at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.readSize(UnsafeRowSerializer.scala:113)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.<init>(UnsafeRowSerializer.scala:120)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3.asKeyValueIterator(UnsafeRowSerializer.scala:110)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:66)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:62)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:118)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:110)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 15385 of input buffer
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
    ... 41 more

EDIT :

有使用的代码

var df = spark.read.option("header", "true").option("inferSchema", "true").option("treatEmptyValuesAsNulls", "true").csv(hdfsFileURLIn).repartition(nPartitions)
df.printSchema()
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName))).persist(StorageLevel.MEMORY_AND_DISK)
df.repartition(nPartitions, $"ipix").write.mode("overwrite").option("spark.hadoop.dfs.replication", 1).parquet(hdfsFileURLOut)

用户函数 a2p 只是接受两个 Double 并返回另一个 double

我需要说的是,这对于相对较小的 CSV (~1Go) 效果很好,但对于较大的 CSV (~15Go) 每次都会发生此错误

编辑2: 按照建议,我禁用了重新分区并使用了 StorageLevel.DISK_ONLY

这样我就不会因为异常而导致 Putting block rdd_***** failed,但仍然存在与 LZ4 相关的异常(流已损坏):

16/10/28 07:53:00 ERROR util.Utils: Aborting task
java.io.IOException: Stream is corrupted
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
    at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
    at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:254)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 12966 of input buffer
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
    ... 25 more

编辑3:我通过删除第二个重新分区(使用 ipix 列重新分区的分区)成功地启动了它,没有任何错误,我将进一步查看此方法的文档

编辑4:这很奇怪,有时一些执行器会因分段错误而失败:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f48d8a47f2c, pid=3501, tid=0x00007f48cc60c700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 1.8.0_102-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# J 4713 C2 org.apache.spark.unsafe.types.UTF8String.hashCode()I (18 bytes) @ 0x00007f48d8a47f2c [0x00007f48d8a47e60+0xcc]
#
# Core dump written. Default location: /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/core or core.3501
#
# An error report file with more information is saved as:
# /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/hs_err_pid3501.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

我检查了内存,我的所有执行器总是有足够的可用内存(至少 6Go)

编辑4:所以我用多个文件进行了测试,执行总是成功,但有时一些执行程序失败(出现上述错误)并由 YARN 再次启动


您使用的是哪个版本的 lz4-java?这可能与 1.1.2 版本中修复的问题有关 - 请参阅此错误报告 https://github.com/jpountz/lz4-java/issues/13

另外,我对你的函数 a2p 很好奇。理想情况下,它应该采用两个 Column 对象作为输入,而不仅仅是 Doubles(除非您将其注册为 UDF)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 作业在 YARN 模式下失败 的相关文章

随机推荐

  • 使用活动管理配置设置活动管理命名空间

    我正在使用带有设计的活动管理 并且尝试为所有活动管理路由设置命名空间 在 active admin rb 中 我将 default namespace 设置为 config default namespace abc 123 在我的路线中
  • 设置 ~/.conda/pkgs 目录的路径

    在主目录空间非常有限的集群上工作 因此我想将 conda pkgs 文件夹的主目录默认设置为另一个目录 但我似乎不知道如何执行此操作 我尝试过将 condarc 文件与 pkg dirs 一起使用 channels conda forge
  • 停止服务后如何从传感器取消注册侦听器?

    我正在创建一个应用程序 该应用程序在按下 开始 按钮时启动服务 并在按下 停止 按钮时停止服务 在服务中 我为传感器 ACCELEROMETER 注册了一个侦听器 以便获得 x y z 轴的加速计值 但是当我停止应用程序并从传感器取消注册侦
  • 如何使用 boost::log::BOOST_TRIVIAL_LOG 更改默认格式?

    boost log 看起来真的很强大 它提供了一个用于简单日志记录的 BOOST LOG TRIVIAL 宏 但如何更改默认格式呢 它默认打印时间戳 但我不想要它 你有什么主意吗 似乎唯一的方法是重新定义一个新的接收器并将其添加到核心 然后
  • 使用 if 语句时,当前上下文中不存在该名称[重复]

    这个问题在这里已经有答案了 我想用用户给出的小数位数来计算 Pi 当输入为 0 时 变量 piNumber 应设置为 3 而不是 3 以便输出没有无用的逗号 这有效 static string PiNumberFinder int amou
  • 如何在 PySpark Pipeline 中使用 XGboost

    我想更新我的 pyspark 代码 在pyspark中 它必须将基础模型放入管道中 即办公室演示 http spark apache org docs latest ml pipeline html管道使用逻辑回归作为基本模型 但是 似乎无
  • CLion:调试由 python 生成的 C++ 子进程

    CLion 有没有办法调试从 Python 脚本运行的 C 程序subprocess Popen 我可以设置断点并调试Python程序 如果直接运行C 程序 我可以设置断点并调试它 但是当程序从Popen启动时 我无法触发C 断点 我尝试在
  • 如何在unix中查找字符串之间包含多个空格的行?

    我有像这样的行 1 Harry says hi 2 Ron says bye 3 Her mi oh ne is silent 4 The above sentence is weird 我需要一个 grep 命令来检测第三行 这就是我正在
  • 矩阵求逆 R

    我想求逆方形对称正定矩阵 我知道有两个功能solve and chol2inv 在 R 中 但他们的结果是不同的 我需要知道为什么会发生这种情况 谢谢 以下是计算矩阵逆的几种方法 包括solve and chol2inv gt A lt m
  • 默认情况下启用或禁用菜单项。为什么?

    我有一些遗留代码 由于某种原因 菜单项在启动时被启用或禁用 我的问题是 如何 有没有办法在不调用 EnableMenuItem 函数的情况下执行此操作 MFC 有没有办法做与资源设置所说相反的事情 我也不明白为什么当最后一个子窗口关闭时 当
  • 页面上或 .js 脚本中是否可以有多个 jQuery 就绪事件?

    我注意到 jQuery 就绪事件和函数在我的大部分 JavaScript 中只使用了一次 但是 我想知道是否可以多次使用就绪事件 例如 是否可以在我的主 html 代码中的就绪事件函数调用中调用函数 然后在 js 文件中的另一个就绪事件函数
  • 在 Woocommerce 中下订单后,将值插入自定义表中

    我需要插入到我的自定义表中许可证表 username order id Quantity This needs to be populated when an order is placed Username customer s emai
  • 这段 1988 年的 C 代码有什么问题?

    我正在尝试编译 C 编程语言 K R 一书中的这段代码 它是 UNIX 程序的基本版本wc include
  • 尝试通过我的新域访问 WordPress with LiteSpeed 时显示 404

    我已经在 Digital Ocean Droplet 中创建了一个带有开放 LiteSpeed 缓存的 WordPress 实例 如果我在 设置 常规站点 和 WordPress URL 中配置了 IP 则它可以正常工作 但是当我添加我的
  • SQL:枚举每个组内返回的行

    假设我有一个SELECT 返回某物的查询 像这样 role name MANAGER Alice WORKER Bob WORKER Evan WORKER John MANAGER Max WORKER Steve 是否可以添加另一列来枚
  • 从 Git 存储库安装 Python 包后,某些文件夹丢失

    我想从以下存储库安装软件包https github com geomin django countria https github com geomin django countria 我正在使用的命令是pip install git gi
  • Xcode 存档调试条错误

    我正在尝试将大型遗留 C 库与 iOS 应用程序集成 我们能够在设备上构建并运行 但无法存档该应用程序 归档失败并出现以下错误 命令 Applications Xcode app Contents Developer Toolchains
  • 关闭依赖项的默认功能

    我有一个依赖链 最终依赖于可选地在已弃用的库上 具体来说 我想使用间接依赖于 rustc serialize 的 nalgebra 如下所示 nalgebra gt alga gt num complex gt 可选默认值 rustc se
  • 与多处理错误的另一个混淆是,“模块”对象没有属性“f”

    我知道之前已经回答过这个问题 但似乎直接执行脚本 python filename py 不起作用 我在 SuSE Linux 上安装了 Python 2 6 2 Code usr bin python coding utf 8 from m
  • Spark 作业在 YARN 模式下失败

    我有一个用 Scala 编写的 Spark 程序 它从 HDFS 读取 CSV 文件 计算新列并将其保存为 parquet 文件 我正在 YARN 集群中运行该程序 但每次我尝试启动它时 执行程序都会在某个时候失败并出现此错误 您能帮我找出