Py4JJavaError:调用 o1670.collectToPython 时发生错误

2023-11-26

我正在尝试将 Spark RDD 转换为 Pandas DataFrame。

我使用 csv 文件作为示例。该文件有 10 以下是前 3 行:

“可堆叠储物架的 Eldon 底座,铂金”,Muhammed MacIntyre,3,-213.25,38.94,35,努勒维特,存储和组织,0.8

“1.7 立方英尺紧凑型“立方体”办公冰箱”,Barry French,293,457.81,208.16,68.02,努勒维特,电器,0.58

“Cardinal Slant-D� 环形活页夹,大规格乙烯基”,Barry French,293,46.71,8.69,2.99,努勒维特,活页夹和活页夹配件,0.39

我的代码在这里:

import pandas as pd
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
sc = spark.sparkContext


from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.context import SQLContext

schema = StructType([StructField(str(i), StringType(), True) for i in range(10)])

text = sc.textFile('data_53000kb.csv')
text = text.map(lambda x: [c.strip() for c in x.split(',')])
df = spark.createDataFrame(text, schema)
df.toPandas()

此时我收到以下错误:

Py4JJavaError: An error occurred while calling o1670.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 72, localhost, executor driver): java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:394)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:214)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:394)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:214)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

我现在能做什么?


df.toPandas() 将所有数据收集到驱动程序节点,因此这是非常昂贵的操作。还有一个名为 maxResultSize 的 Spark 属性

Spark.driver.maxResultSize (默认 1G) --> 每个 Spark 操作(例如收集)的所有分区的序列化结果总大小限制(以字节为单位)。至少应为 1M,或 0 表示无限制。如果总大小超过此限制,作业将被中止。限制过高可能会导致驱动程序内存不足错误(取决于spark.driver.memory和JVM中对象的内存开销)。设置适当的限制可以保护驱动程序免受内存不足错误的影响。

如果估计的数据大小大于 maxResultSize 给定作业将被中止。这里的目标是保护您的应用程序免受驱动程序丢失的影响,仅此而已。

您可能需要增加 maxResultSize

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Py4JJavaError:调用 o1670.collectToPython 时发生错误 的相关文章

随机推荐

  • 从“docker ps”获取容器 ID 的 Shell 命令

    我基本上希望实现这两个步骤 1 运行docker镜像 docker run p 80 80 某些图像名称 25 2 现在 docker ps 返回有关容器的完整数据 但我只是在寻找容器 ID 3 对其进行一些测试 例如 docker exe
  • jquery颜色动画间歇性地抛出无效的属性值

    我正在尝试为 ASP Net 超链接的背景设置动画 以在更新面板刷新时进行黄色淡入淡出 到目前为止 它几乎在所有时间都有效 但偶尔会抛出一个 JavaScript 错误 无效的属性值 它调试到jquery颜色插件代码到这一行 fx elem
  • 为 STL 排序算法定义 < - 运算符重载、函子还是独立函数?

    我有一个包含 Widget 类对象的 stl list 它们需要根据 Widget 类中的两个成员进行排序 为了使排序工作 必须定义一个比较两个 Widget 对象的小于比较器 似乎有无数种方法可以做到这一点 据我所知 人们可以 A 在类中
  • 获取内存上的可用空间

    是否可以通过 Android SDK 获取 Android 设备 而不是 SD 卡 上的可用内存量 如果是这样 怎么办 this帖子可能很适合您的问题 还检查这个线程 这里有很多关于SO的信息 谷歌搜索了一下 这是解决方案 位于安卓 git
  • 隐藏超出 DIV 元素的文本

    我有一个固定宽度的 DIV 元素 其中有一些文本 其中没有任何空格供 HTML 解析器自动分成多行 文本超出了 DIV 的限制并弄乱了 pgae 有没有办法让超出边界的文本不可见 是否可以将其分成多行 或者更好地分成多行 并在每条折行的末尾
  • 多线程比单线程快吗?

    我想检查多线程是否比单线程快 然后我在这里做了一个演示 public class ThreadSpeedTest param args public static void main String args System out print
  • 将“C50 型号”转换为“rpart”型号

    有没有办法使用rpart plot用于绘制不属于的对象的库rpart 用于制作决策树 例如 这是经典的rpart and rpart plot正在运行的库 load libraries library rpart library rpart
  • mysql中什么是复合外键?

    在我正在使用的框架的文档中看到这个术语 复合外键 yii 什么是复合外键 在 mySql 数据库中 我的猜测是 考虑到两个表之间的关系 一个表有一列的名称与另一个表的 id 完全相同 免责声明 我做了尽职调查 并在谷歌上搜索了大约两分钟 但
  • VS 2010 Web 服务项目模板丢失?

    这可能是一个愚蠢的问题 但当我尝试创建新项目时 我找不到 Web 服务应用程序模板 您可能需要一个 WCF 服务项目 新建项目 gt Visual C 或 Visual Basic gt WCF 服务应用程序
  • 如何在 JSON 中显示带有尾随零的 BigDecimal 数字(而不是字符串)?

    在我的表示响应中 我有一个 BigDecimal 类型的字段 它的值为 2 30 但 json 响应将其显示为 2 3 有没有办法同时显示尾随零 而不将其显示为字符串 顺便说一句 我正在使用杰克逊库 version 2 3 needs to
  • 还有一个“无法加载文件或程序集......或其依赖项之一。系统找不到指定的文件”

    我有一个带有 NUnit 测试的 dll 运行良好 我将其从 Any CPU 转换为 x86 项目 因为我需要跨不同平台可靠地使用 SQLite 因此我需要包含 32 位 System Data SQLite dll 并让所有内容都引用它
  • 像 iPhone 上的地址簿排序一样对 NSString 的 NSArray 进行排序

    我有一个字符串数组 名称 我想像 iPhone 上的地址簿对它们进行排序一样对它们进行排序 例如 li gt E 下 例如 li gt A 下 例如 4li gt 在 下 有什么建议么 您需要对字符串执行不区分变音符号的比较 NSStrin
  • 对卷积神经网络中 1D、2D 和 3D 卷积的直观理解[关闭]

    Closed 这个问题不符合堆栈溢出指南 目前不接受答案 谁能通过示例清楚地解释卷积神经网络 深度学习中 中 1D 2D 和 3D 卷积之间的区别 我想用图片来解释C3D 简而言之 卷积方向 输出形状很重要 一维卷积 基础 just 1 计
  • getView() 返回 null

    我基本上有一个AsyncTask 从主运行Activity 填充一个ViewPager在一个片段内 我正在膨胀 xml 布局文件来填充ViewPager 问题是我无法获取指向布局内视图 imageview textview 的指针 以便我可
  • Android Deeplink pathPrefix 属性被忽略

    我在清单文件中为我的 Android 应用程序定义了一个深层链接
  • 如何在MVC中使用bootstrap modal编辑表格数据?

    我在 MVC 视图中有一个表 显示员工详细信息 我想添加编辑功能 但我不想在新页面中打开它 而是想使用引导模式来显示它 http twitter github com bootstrap javascript html modals 我认为
  • 在 Swing 应用程序中显示 HTML 表单并与之交互

    一个应用程序生成一些HTML 页面应该显示在应用程序本身中 These HTML 页面含有一些forms用户将使用它来输入一些值 到目前为止我已经用过文本窗格这使得HTML完美 但我不知道如何与表单交互以检索用户输入的值 是否可以使用 JT
  • 为什么 File.ReadAllBytes 结果与使用 File.ReadAllText 时不同?

    我有一个内容为 test 的文本文件 UTF 8 编码 我尝试从该文件中获取字节数组并将其转换为字符串 但它包含一个奇怪的字符 我使用以下代码 var path C Users Tester Desktop test test txt UT
  • Java javax.swing.Timer 在新线程上运行吗?

    我正在使用 javax swing Timer 来安排和运行事件 但它似乎冻结了 GUI 只是想知道这些事件是否在单独的线程上运行 或者我是否必须自己执行 Thanks 尽管所有 Timer 使用单个共享线程 由第一个执行的 Timer 对
  • Py4JJavaError:调用 o1670.collectToPython 时发生错误

    我正在尝试将 Spark RDD 转换为 Pandas DataFrame 我使用 csv 文件作为示例 该文件有 10 以下是前 3 行 可堆叠储物架的 Eldon 底座 铂金 Muhammed MacIntyre 3 213 25 38