PySpark Worker 中 rdd.collect() 上的 ModuleNotFoundError

2023-12-19

我正在 python 中运行 Apache Spark 程序,并且收到一个我无法理解且无法开始调试的错误。我有一个驱动程序,它在名为 hound.py 的文件中定义了一个名为 hound 的函数。在同一目录中,我有一个名为 hound_base.py 的文件,它定义了一个名为 hound_base_func 的函数。因此,为了在 hound 中调用它,我导入“from hound_base import hound_base_func”。这可行,我调用该函数并传递 Spark 数据帧。 hound_base_func 将其作为参数,对其底层 rdd 进行一些工作,并调用 rdd.collect()。这实际上使代码崩溃,并显示错误消息“ModuleNotFoundError:没有名为‘hound_base’的模块”,这是没有意义的!这就是说无法找到代码实际上执行的模块。愿意提供尽可能多的详细信息,但这就是我所知道的与该问题相关的全部内容......有什么关于我如何解决这个问题的提示吗?

全追踪

2018-06-14 14:29:26 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
2018-06-14 14:29:26 WARN  TaskSetManager:66 - Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

2018-06-14 14:29:26 ERROR TaskSetManager:70 - Task 0 in stage 2.0 failed 1 times; aborting job
[Stage 2:>                                                          (0 + 1) / 1]Traceback (most recent call last):
  File "F:\data\src\hound.py", line 43, in <module>
    hound("fakedata.csv", "Field1", "Field2", "Field3", ["Field4a", "Field4b"])
  File "F:\data\src\hound.py", line 37, in hound
    hound_base_func(data)
  File "F:\data\src\hound_base.py", line 220, in hound_base_func
    rdd_collected = rdd_result.collect()
  File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\rdd.py", line 824, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\Brian\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\Brian\Miniconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)

    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

    at java.lang.Thread.run(Unknown Source)


Driver stacktrace:

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)

    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

    at scala.Option.foreach(Option.scala:257)

    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)

    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)

    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)

    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)

    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

    at org.apache.spark.rdd.RDD.collect(RDD.scala:938)

    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)

    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

    at java.lang.reflect.Method.invoke(Unknown Source)

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

    at py4j.Gateway.invoke(Gateway.java:282)

    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

    at py4j.commands.CallCommand.execute(CallCommand.java:79)

    at py4j.GatewayConnection.run(GatewayConnection.java:214)

    at java.lang.Thread.run(Unknown Source)

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 216, in main
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 58, in read_command
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 170, in _read_with_length
    return self.loads(obj)
  File "C:\Users\Brian\Miniconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 559, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'hound_base'


    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)

    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)

    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)

    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

    ... 1 more


SUCCESS: The process with PID 18960 (child process of PID 6380) has been terminated.
SUCCESS: The process with PID 6380 (child process of PID 1400) has been terminated.
SUCCESS: The process with PID 1400 (child process of PID 19344) has been terminated.
[Finished in 21.811s]

这里有多个问题:

首先,您不允许从执行器任务(即从 rdd.map() 内的任何函数)访问 Spark 上下文。

其次,在 .map 的 lambda 函数内部使用外部函数很棘手。一种解决方案是如果可能的话将所有函数定义移至原始函数内。如果任何文件位于不同的文件中,则必须使用spark_context.addPyFile(path)显式添加该文件,因为在驱动程序内部导入是不够的。

这些东西解决了我遇到的这个错误的(许多)问题。请注意,由于惰性求值,它只会在 .collect() 上抛出。不好玩。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark Worker 中 rdd.collect() 上的 ModuleNotFoundError 的相关文章

随机推荐

  • 无法通过 nginx-ingress-controller 访问 Kubernetes ClusterIP 服务

    我是一名 Kubernetes 业余爱好者 尝试在 GKE 上使用 NGINX 入口控制器 我正在跟进this https cloud google com community tutorials nginx ingress gke谷歌云文
  • Docker Windows 容器 403 - 禁止:访问被拒绝

    我在我的应用程序中使用以下脚本创建了一个图像Dockerfile 但是当我使用容器的 IP 地址和端口 2000 浏览时 我收到 403 Forbidden 访问被拒绝 我使用的是 Windows 10 0 14393 Build 1439
  • android.util.Base64 编码/解码标志参数

    根据 Javadoc android util Base64 decode 采用两个参数 文本和 flags 这些标志是 int 形式并且 我引用 flags controls certain features of the decoded
  • 如何在事件上使用 RxJs 和 Socket.IO

    我想在我的内部使用 RxJSsocket on sense function data 我对可用的文档很少以及对 RxJS 缺乏了解感到困惑和困惑 这是我的问题 我有一个distSensor js有一个函数 pingEnd function
  • 如何创建具有不同部分、不同颜色的圆角矩形

    要求 我如何创建一个像这样的视图 我想在屏幕上绘制一个视图 该视图是一条分成几段的线 显示整个视图的值百分比 我的要求是 视图有不同颜色的不同部分 视图可能不会渲染所有部分 它可能只有前 2 个或第一个和最后一个或只有单一颜色等 这仅在运行
  • 在无头模式下运行 WinDbg

    有没有办法让 WinDbg 处于纯无头模式 我的用例是 我应该能够在命令行上模拟 analyze v 命令进行小型转储 而无需启动 WinDbg GUI 使用 Windows 调试工具 还可以cdb 控制台调试器 要调试故障转储 请使用 z
  • firebase 分析中的 user_engagement 事件是如何生成的?

    我看到一个名为firebase screen class 随着engagement time msec firebase event origin and firebase screen id 在里面user engagementbigQu
  • “ng 新的我的应用程序”错误

    我正在尝试使用最新版本的 Angular 生成一个新项目和骨架应用程序 我尝试使用以下命令卸载并重新安装 Angular CLI C gt npm install g angular cli latest C gt ng version a
  • 为什么 WinDbg、任务管理器和 VS 调试器报告的线程数不同?

    当我的 Net 3 5 应用程序运行时 Windows 任务管理器显示我的应用程序有 16 个线程 我收集了该进程的内存转储并使用 WinDbg SOS 打开它 运行 threads 命令显示我有 ThreadCount 456 Unsta
  • 如何将 2 个 zip 文件合并为 1 个?

    我有 2 个 zip 文件 zip1 和 zip2 我需要将这些文件合并为一个 我该如何解决 我知道我可以修复它 将 ZIP1 解压到临时文件夹 然后将其添加到 zip2 但我认为效率很低 更快的方法是什么 我在用着System IO Co
  • Android:创建自定义资源类

    Android 上的 R 类有其局限性 您不能动态使用资源来加载音频 图片或其他内容 例如 如果您不想为选定的对象加载一组音频文件 则您不能执行以下操作 R raw string upon choosen object 我是 android
  • Jenkins 签出 GIT 项目失败,权限被拒绝致命:无法分叉

    我有一个 Freestyle Jenkins 项目 它使用 bitbucket 作为 SCM 使用 ssh 作为协议和私钥 不是用户 密码 当我在 master 上构建项目时 它失败并显示以下堆栈跟踪 而它在代理上运行良好 注意错误提到 s
  • X-Frame-Options 禁止 Facebook 应用程序错误

    我正在构建一个 Facebook 应用程序 目前它处于沙盒模式 我的代码 索引 php
  • Slick 3.0如何更新变量列列表,哪个数字只有在运行时才知道

    是否可以更新变量列列表 其中的数字仅在运行时由 slick 3 0 知道 下面是我想要做的示例 不会编译 var q Query UserTable UserTable TableElementType Seq userTable var
  • 隐式参数和函数

    我在考虑 Haskell GHC 中的隐式参数时遇到问题 我有一个函数f 假设隐式参数x 并希望通过应用将其封装在上下文中f to g f x Int gt Int gt Int f n n x g Int gt Int gt Int gt
  • 如何用范围填充可变参数?

    填充可变参数的正确方法是什么 我的尝试看起来像自行车 首先我构建范围然后我将其转换为列表然后到 intarray然后传播它 m getColumns count count 35 toList toIntArray 其中 getColumn
  • 将字符串插入工作表会导致插入数字

    在我的 Google Apps 脚本中 我在电子表格中添加了一行 在附加的这一行中 我尝试插入一个值 0102 的字符串 但是插入时它会转换为数字 102 有没有什么方法可以使用 Google Apps 脚本将值插入到工作表中 而不会格式化
  • Ubuntu 自动从 Github 存储库中拉取

    我在我的服务器上安装了 git 但我希望每当我推送本地所做的更改时它都会从我的 github 存储库中提取 我研究过钩子 但它非常令人困惑 而且我找不到任何教程 有谁知道这是怎么做到的吗 我希望我的服务器在每次提交后从存储库中提取 这看起来
  • JPA 和 PostgreSQL 与 GenerationType.IDENTITY

    我有关于 Postgres 和 GenerationType Identity 与 Sequence 的问题 在这个例子中 Id SequenceGenerator name mytable id seq sequenceName myta
  • PySpark Worker 中 rdd.collect() 上的 ModuleNotFoundError

    我正在 python 中运行 Apache Spark 程序 并且收到一个我无法理解且无法开始调试的错误 我有一个驱动程序 它在名为 hound py 的文件中定义了一个名为 hound 的函数 在同一目录中 我有一个名为 hound ba