错误PythonUDFRunner:Python工作程序意外退出(崩溃)

2024-03-13

我正在运行一个调用 udfs 的 PySpark 作业。我知道 udf 的内存很差,并且由于序列化/反序列化而速度缓慢,但由于情况,我们必须使用。

数据集大小为 60GB,分区良好,集群内存为 240GB。该作业运行良好,读取它并执行 Spark 函数,但当它开始调用 python udfs 时总是失败并出现以下错误。起初我以为这是内存问题,所以我增加了节点和执行器的内存,但问题仍然存在。此错误是什么意思以及如何解决?

执行者日志

19/11/03 05:05:52 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/03 05:05:52 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/11/03 05:05:52 ERROR PythonUDFRunner: This may have been caused by a prior exception:
java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
19/11/03 05:05:52 ERROR Executor: Exception in task 479.0 in stage 2.0 (TID 1177)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
    ... 20 more
19/11/03 05:05:52 ERROR Executor: Exception in task 478.0 in stage 2.0 (TID 1176)
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/03 05:05:52 ERROR Executor: Exception in task 381.0 in stage 2.0 (TID 1079)
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/03 05:05:52 ERROR Executor: Exception in task 472.0 in stage 2.0 (TID 1170)
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/11/03 05:05:52 INFO Executor: Not reporting error to driver during JVM shutdown.
19/11/03 05:05:52 INFO Executor: Not reporting error to driver during JVM shutdown.
19/11/03 05:05:52 INFO Executor: Not reporting error to driver during JVM shutdown.
19/11/03 05:05:52 INFO Executor: Not reporting error to driver during JVM shutdown.

该错误可能是由各种问题引起的。你必须首先找到根本原因。

首先确保 udf 中的最高级别有一个 try-except-block 并在那里记录异常。

其次注册一个处理程序,例如import faulthandler; faulthandler.enable()尽早在驱动程序和工作人员代码 (udf) 中。如果原因是分段错误,处理程序将打印堆栈跟踪,在日志中可见。

两种方法都可以帮助您了解根本问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

错误PythonUDFRunner:Python工作程序意外退出(崩溃) 的相关文章

  • 优化完美平方问题,类似于Python中的硬币找零

    我这里有一个硬币兑换的解决方案 python 中的 leetcode 硬币兑换 https stackoverflow com questions 69517078 coin change leetcode in python 因为完全平方
  • 如何测试使用 XCom 的 Apache Airflow 任务

    我正在尝试找出一种测试 DAG 的方法 其中有几个任务使用 XCom 进行通信 由于控制台命令只允许我从 DAG 运行任务 有没有一种方法可以测试通信而无需通过 UI 运行 DAG Thanks 这是一种对我有用的方法 尽管 Airflow
  • 如何忽略传递给函数的意外关键字参数?

    假设我有一些功能 f def f a None print a 现在 如果我有一本字典 比如dct a Foo 我可以打电话f dct 并得到结果Foo打印 但是 假设我有一本字典dct2 a Foo b Bar 如果我打电话f dct2
  • 重新索引错误没有意义

    I have DataFrames大小在 100k 到 2m 之间 我正在处理这个问题的框架是如此之大 但请注意 我必须对其他框架执行相同的操作 gt gt gt len data 357451 现在这个文件是通过编译许多文件创建的 所以它
  • 如何移动我的图像? python 3.10.4 pygame

    我会移动我的图像 图像是matiskinfinal png 我尝试将像素添加到 x 或其他我不知道它是什么的东西 因为我真的是 python 的初学者 pygame但是是 x x 变化 但图像没有移动 import os import py
  • pip 安装失败,SSL 证书验证失败 (_ssl.c:833)

    我无法通过 pip install 安装任何外部 python 模块 我已经正确安装了 python 但如果我使用 pip install 它会显示此错误 这是我运行后的代码pip install pytesseract C Users 1
  • 在 Python 中绘制分类数据的三个维度

    我的数据包含三个我试图可视化的分类变量 城市 五个之一 职业 四种之一 血型 四种之一 到目前为止 我已经成功地以一种我认为易于使用的方式对数据进行了分组 import numpy as np pandas as pd Make data
  • Spyder 导入模块出错

    我正在尝试在 Spyder 中使用 sklearn 一开始 当我尝试导入它时 我收到 ImportError No module named sklearn 然后我用 PYTHONPATH 管理器设置 PATH 然后使用工具菜单中的 更新模
  • 如何从 Lua 调用 Python 函数?

    我想从我的 lua 文件运行 python 脚本 我怎样才能实现这个目标 Example Python代码 sum py file def sum from python a b return a b Lua code main lua f
  • Windows Defender 检测 Python EXE 为木马

    我制作了一个 Python 脚本 将 Windows 目录以 zip 形式邮寄给我 我使用 sched 模块添加了一个调度程序 每小时重复一次 我试图制作一个简单的同步应用程序供个人使用 在 Windows 启动时启动 我使用将其转换为 e
  • Seaborn 热图中的自定义调色板间隔

    我正在尝试绘制一个heatmap https seaborn pydata org generated seaborn heatmap html使用seaborn库 绘图函数如下所示 def plot confusion matrix da
  • 了解 asyncio 已经运行的永久循环和挂起的任务

    我在理解如何将新任务挂起到已经运行的事件循环中时遇到问题 这段代码 import asyncio import logging asyncio coroutine def blocking cmd while True logging in
  • 如何绘制多类分类器的精度和召回率?

    我正在使用 scikit learn 我想绘制精度和召回曲线 我正在使用的分类器是RandomForestClassifier scikit learn 文档中的所有资源都使用二元分类 另外 我可以绘制多类的 ROC 曲线吗 另外 我只找到
  • 检查多个 pd.DataFrame 是否相等

    是否有一种 Pythonic 方式 无循环或递归 来检查是否超过两个pd DataFrames 例如 pd DataFrames 列表 彼此相等吗 就像是 all x equals dfs 0 for x in dfs with dfs数据
  • 如何测试列表中多个值的成员资格

    我想测试两个或多个值是否在列表中具有成员资格 但我得到了意外的结果 gt gt gt a b in b a foo bar a True 那么 Python 可以同时测试列表中多个值的成员资格吗 这个结果意味着什么 See also How
  • 如何加速 pandas 字符串函数?

    我正在使用 pandas 矢量化 str split 方法来提取从 上的拆分 返回的第一个元素 我还尝试使用 df apply 与 lambda 和 str split 来产生等效的结果 使用 timeit 时 我发现 df apply 的
  • Scrapy的redirect_urls异常.KeyError

    我是 Scrapy 和 Python 的新手 最近推出了我的第一个蜘蛛 有一个功能似乎以前有效 但现在它只适用于我试图废弃的一些网站 代码行是 item url direct response request meta redirect u
  • Pip 突然使用了错误版本的 Python

    在 os x 上使用 pip 时遇到一个奇怪的问题 据我所知 快速查看我的 bash history 似乎可以确认 我最近没有对我的配置进行任何更改 唉 pip 命令似乎突然使用了与以前不同的 python 版本 到目前为止 我使用命令 p
  • 在至少 7 天内连续三天登录该产品的用户

    我有一个用于用户参与的数据框 df 如下所示 time stamp user id 2013 01 01 10 05 23 1 2013 01 03 16 35 23 1 2013 01 06 11 06 35 1 2013 01 10 1
  • 使用 Scala 在 Apache Spark 中拆分字符串

    我有一个数据集 其中包含以下格式的行 制表符分隔 Title lt t gt Text 现在对于每个单词Text 我想创建一个 Word Title 一对 例如 ABC Hello World gives me Hello ABC Worl

随机推荐