Spark问题中读取大文件 - python

2024-05-13

我已经使用 python 在本地安装了 Spark,并在运行以下代码时:

data=sc.textFile('C:\\Users\\xxxx\\Desktop\\train.csv')
data.first()

我收到以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-fca93c6aedeb> in <module>()
----> 1 data.first()

C:\Spark\python\pyspark\rdd.pyc in first(self)
   1313         ValueError: RDD is empty
   1314         """
-> 1315         rs = self.take(1)
   1316         if rs:
   1317             return rs[0]

C:\Spark\python\pyspark\rdd.pyc in take(self, num)
   1295 
   1296             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1297             res = self.context.runJob(self, takeUpToNumLeft, p)
   1298 
   1299             items += res

C:\Spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    937         # SparkContext#runJob.
    938         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 939         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    940         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    941 

C:\Anaconda2\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)
   1024         answer = self.gateway_client.send_command(command)
   1025         return_value = get_return_value(
-> 1026             answer, self.gateway_client, self.target_id, self.name)
   1027 
   1028         for temp_arg in temp_args:

C:\Spark\python\pyspark\sql\utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

C:\Anaconda2\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    314                 raise Py4JJavaError(
    315                     "An error occurred while calling {0}{1}{2}.\n".
--> 316                     format(target_id, ".", name), value)
    317             else:
    318                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

我确信路径是正确的,因为我已经尝试过同一文件夹中的其他文件。我认为问题在于文件的大小,大约为 3.4 GB。

有什么帮助吗?


无论您是在单机模式还是集群模式下使用 Spark,spark.driver.memory and spark.executor.memory默认为 1GB 内存。您可以为两者添加更多内存driver and executors通过在启动 Jupyter Notebook 时或在 Spark Conf 文件中更改此配置。这样,如果您的计算机上有必要的 RAM,您应该能够读取 3.4GB 的 CSV 文件。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark问题中读取大文件 - python 的相关文章

  • 关于使用Python启动SSH隧道的问题

    我在从用 Python 编写的 HTTP RPC 服务器启动 SSH 隧道时遇到了麻烦 基于Python的BaseHTTPServer 有一个用Python编写的简单的HTTP RPC服务器 作为其中一项服务的一部分 我想启动从 RPC 服
  • QSortFilterProxyModel + QAbstractItemModel modelIndex.internalPointer() 导致崩溃

    我在 PyQt 4 8 Python 2 7 中实现了自己的 QAbstractItemModel class FriendListModel QtCore QAbstractItemModel def init self groups c
  • 使用 Marshmallow 中的数据更新行 (SQLAlchemy)

    我正在使用 Flask Flask SQLAlchemy Flask Marshmallow marshmallow sqlalchemy 尝试实现 REST api PUT 方法 我还没有找到任何使用 SQLA 和 Marshmallow
  • 如何配置散景图以具有响应宽度和固定高度

    我使用通过组件功能嵌入的散景 实际上我使用 plot sizing mode scale width 它根据宽度进行缩放并保持纵横比 但我想要一个响应宽度但固定或最大高度 这怎么可能实现呢 有stretch both and scale b
  • 刷新访问令牌时出现“invalid_grant”错误的情况?

    最近我一直在为这个问题揪心 一些背景 使用oauth2客户端 https code google com p google api python client 库来管理用户的令牌 这些令牌用于定期并发执行各种后台任务 每次要为用户运行其中一
  • OpenCV Python 删除图像中的某些对象

    我正在使用带有 opencv 和 numpy 的 python 来检测天文中的星星 例如这个1 https i stack imgur com AKwEJ jpg图片 使用模板匹配 我可以用阈值检测星星 单击 2 2 https i sta
  • Python中#和"""注释的区别

    开始用 Python 编程 我看到一些带有注释的脚本 and comments 这两种评论方式有什么区别 最好的事情就是阅读PEP 8 Python 代码风格指南 https www python org dev peps pep 0008
  • 运行源代码中包含 Unicode 字符的 Python 2.7 代码

    我想运行一个在源代码中包含 unicode utf 8 字符的 Python 源文件 我知道这可以通过添加评论来完成 coding utf 8 在一开始的时候 但是 我希望不使用这种方法来做到这一点 我能想到的一种方法是以转义形式编写 un
  • Keras,如何获取每一层的输出?

    我已经用 CNN 训练了一个二元分类模型 这是我的代码 model Sequential model add Convolution2D nb filters kernel size 0 kernel size 1 border mode
  • 用于打印 C/C++ 文件的所有函数定义的 Python 脚本

    我想要一个 python 脚本来打印 C C 文件中定义的所有函数的列表 e g abc c定义两个函数为 void func1 int func2 int i printf d i return 1 我只想搜索文件 abc c 并打印其中
  • 代理阻止网络套接字?如何绕行

    我有一个用 Python 编写的正在运行的 websocket 服务器 来自https github com opiate SimpleWebSocketServer https github com opiate SimpleWebSoc
  • Python 3.x 中的 PIL ImageTk 等效项

    我正在使用 Tkinter 开发一个应用程序 它使用以下数据库png图标的图像文件 为了在应用程序中使用所述图像 我使用 PIL 打开它们Image open 运行它通过ImageTk PhotoImage函数 然后将其传递给小部件构造函数
  • python 语言环境奇怪的错误。这究竟是怎么回事?

    所以今天我升级到了 bazaar 2 0 2 我开始收到这条消息 顺便说一句 我在雪豹上 bzr warning unknown locale UTF 8 Could not determine what text encoding to
  • 超时时杀死或终止子进程?

    我想尽可能快地重复执行子进程 然而 有时这个过程会花费太长的时间 所以我想杀死它 我使用 signal signal 如下所示 ppid pipeexe pid signal signal signal SIGALRM stop handl
  • Python将csv数据导出到文件中

    我有以下运行良好的代码 但我无法修剪数据并将其存储在数据文件中 import nltk tweets love this car this view amazing not looking forward the concert def g
  • Scrapy - 不会爬行

    我正在尝试运行递归爬行 由于我编写的爬行不能正常工作 因此我从网络上提取了一个示例并进行了尝试 我真的不知道问题出在哪里 但是爬行没有显示任何错误 谁能帮我这个 另外 是否有任何逐步调试工具可以帮助理解蜘蛛的爬行流程 非常感谢任何与此相关的
  • Python:使用列表创建二叉搜索树

    我的代码的目标是从 txt 文件中获取每个单独的单词并将其放入列表中 然后使用该列表创建二叉搜索树来计算每个单词的频率 并按字母顺序打印每个单词及其频率 中的每个单词只能包含字母 数字 或 我无法用我的初学者编程知识来做的部分是使用我拥有的
  • 对 pandas 数据框中的每一列应用函数

    我如何以更多的熊猫方式编写以下函数 def calculate df columns mean self df means for column in df columns columns tolist cleaned data self
  • 带有整数的 np.sqrt 和 where 条件返回错误结果

    当我将 numpy sqrt 方法应用于带有 a 的整数数组时 我得到了奇怪的结果where健康 状况 见下文 对于整数 a np array 1 4 9 np sqrt a where a gt 5 Out 3 array 0 0 5 3
  • 升级后 pip 损坏

    我做了 pip install U easyinstall 然后 pip install U pip 来升级我的 pip 但是 当我尝试使用 pip 时 我现在收到此错误 root d8fb98fc3a66 which pip usr lo

随机推荐