在 Python 中并行处理大型 .csv 文件

2023-12-26

我正在使用 Python 脚本处理大型 CSV 文件(大约有 10M 行的几个 GB 的量级)。

这些文件具有不同的行长度,并且无法完全加载到内存中进行分析。

每一行都由我的脚本中的函数单独处理。分析一个文件大约需要 20 分钟,看来磁盘访问速度不是问题,问题在于处理/函数调用。

代码看起来像这样(非常简单)。实际代码使用 Class 结构,但这是类似的:

csvReader = csv.reader(open("file","r")
for row in csvReader:
   handleRow(row, dataStructure)

鉴于计算需要共享数据结构,那么利用多核在 Python 中并行运行分析的最佳方法是什么?

一般来说,如何从 Python 中的 .csv 一次读取多行以传输到线程/进程?循环使用for在行上听起来效率不高。

Thanks!


这可能为时已晚,但对于未来的用户,我无论如何都会发布。另一张海报提到使用多处理。我可以保证这一点,并且可以提供更多细节。我们每天使用 Python 处理数百 MB/数 GB 的文件。所以这绝对取决于任务。我们处理的一些文件不是 CSV,因此解析可能相当复杂,并且比磁盘访问花费的时间更长。但是,无论文件类型如何,方法都是相同的。

您可以同时处理大文件的各个部分。这是我们如何做到这一点的伪代码:

import os, multiprocessing as mp

# process file function
def processfile(filename, start=0, stop=0):
    if start == 0 and stop == 0:
        ... process entire file...
    else:
        with open(file, 'r') as fh:
            fh.seek(start)
            lines = fh.readlines(stop - start)
            ... process these lines ...

    return results

if __name__ == "__main__":

    # get file size and set chuck size
    filesize = os.path.getsize(filename)
    split_size = 100*1024*1024

    # determine if it needs to be split
    if filesize > split_size:

        # create pool, initialize chunk start location (cursor)
        pool = mp.Pool(cpu_count)
        cursor = 0
        results = []
        with open(file, 'r') as fh:

            # for every chunk in the file...
            for chunk in xrange(filesize // split_size):

                # determine where the chunk ends, is it the last one?
                if cursor + split_size > filesize:
                    end = filesize
                else:
                    end = cursor + split_size

                # seek to end of chunk and read next line to ensure you 
                # pass entire lines to the processfile function
                fh.seek(end)
                fh.readline()

                # get current file location
                end = fh.tell()

                # add chunk to process pool, save reference to get results
                proc = pool.apply_async(processfile, args=[filename, cursor, end])
                results.append(proc)

                # setup next chunk
                cursor = end

        # close and wait for pool to finish
        pool.close()
        pool.join()

        # iterate through results
        for proc in results:
            processfile_result = proc.get()

    else:
        ...process normally...

就像我说的,这只是伪代码。它应该让任何需要做类似事情的人开始。我面前没有代码,只是凭记忆做。

但我们在第一次运行时获得了超过 2 倍的速度提升,而无需对其进行微调。您可以根据您的设置微调池中的进程数量以及块的大小以获得更高的速度。如果您像我们一样有多个文件,请创建一个池来并行读取多个文件。只是要小心,不要让太多进程使盒子超载。

注意:您需要将其放入“if main”块中,以确保不会创建无限进程。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Python 中并行处理大型 .csv 文件 的相关文章

随机推荐

  • 您将如何在这里使用敏捷? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是敏捷的大力支持者 但我的一个朋友 他还不知道敏捷 他是管理类型 问我如何规划和开发一个复杂的分布式项目 其中包含数据库层 通信层层 接口以及
  • 使用 Go 获取 Python 版本

    我正在尝试使用 Go 获取我的 Python 版本 import log os exec strings func verifyPythonVersion err exec LookPath python if err nil log Fa
  • 将 dll 与 node-ffi 一起使用

    我正在使用 node ffi 访问我购买的自定义硬件提供的 dll 该 dll 使用设备驱动程序来执行操作 他们不提供 dll 文档 但他们有一个 c 中的示例应用程序 该 dll 在 c 中使用像这样 DllImport POS CIDR
  • 我对 REST 有什么不理解的地方?

    我正在构建一个框架 并希望使用它进行构建的开发人员能够允许其部分内容与其他站点共享数据并允许其他站点添加 编辑 删除数据 例如 如果有人制作了一个包含书评 作者 引言 代码示例 评论等的网站 那么开发人员可以制作例如 书评 对于其他网站来说
  • 处理 django 包含模板标签中的请求

    我是 Django 新手 正在尝试将上传文件表单放入包含标签中 所以我可以在各种模板中使用它 我创建了以下包含标签 upload files py register inclusion tag upload form html def up
  • PrimeFaces p:fileUpload 不调用方法

    我正在尝试使用 PrimeFaces
  • MVVM 是否违反了 DRY?

    看来我制作的 ViewModels 看起来可疑地像其他班级一样而且它们似乎需要大量的代码重复 例如在当前的项目中我有 SmartForm Model that represents a data form to fill in has pr
  • 将本地图片上传到tinyMCE

    tinyMCE有一个插入图像按钮 但如何处理其功能 请给出一些代码 我已经对 pavanastechie 编写的代码投了赞成票 但最终我重写了很多次 这是一个更短的版本 可能对某些人有价值 tinymce init toolbar imag
  • 如何在Python中创建链表

    我正在尝试解决 python 中的链表编码挑战 我只给出了以下课程来创建链接列表 Definition for singly linked list class ListNode object def init self x self va
  • 单击小部件时播放声音

    这是我的代码 它打开主要活动 但我似乎找不到一种方法来让小部件播放声音 我尝试过了 向小部件添加一个按钮 不起作用 add an OnClickListener到主要活动 有效 但它打开主要活动 我只想要声音而不是活动 编写一个新方法来播放
  • 了解 ASP.NET WebForms 中控件处于生命周期的哪个阶段

    从控件的外部 是否可以找出特定控件或页面处于页面生命周期的哪个阶段 初始化 加载 预渲染等 例如 在伪代码中 if myControl CurrentLifeCycle Lifecycle Init do something 恐怕没有内置函
  • Qt QSqlQuery 准备和bindValue 不工作

    我在准备和绑定值时遇到问题 db open QSqlQuery q q prepare SELECT id malade nom prenom FROM Malade WHERE nom LIKE p OR prenom f q bindV
  • 如何使用 SetWindowsHookEx 和 WH_KEYBOARD 挂钩外部进程

    我试图挂钩例如记事本但没有成功 制作一个全局钩子似乎效果很好 在 XP SP2 上测试 编辑 修改后的代码现在可以使用 MyDLL代码 include
  • 如何以编程方式打印各种文件类型

    我正在编写一个应用程序 它执行一些测试并生成许多不同的报告 这些可以是标签 最终客户的 PDF 维修部门的 PDF XML 文件等的任意组合 根据报告类型 我需要将文件发送到文件系统或多种不同打印机 A4 标签等 之一 理想情况下不应该有弹
  • F# 是否具有与 C# 的“不安全”块等效的语法

    大量的数组边界检查会降低速度 对于二维数组尤其如此 有没有办法在 F 中编写不安全的代码块 我不是一个F http cs hubfs net blogs f team archive 2006 08 15 506 aspx程序员 但据我所知
  • 如何取消订阅使用 lambda 表达式的事件?

    我有以下代码让 GUI 响应集合中的更改 myObservableCollection CollectionChanged sender e gt UpdateMyUI 首先 这是一个好方法吗 第二 取消订阅此活动的代码是什么 是否相同 但
  • 如何在不指定变量来保存其 OUT 参数的情况下调用 PL/SQL 过程?

    我想调用指定了 OUT 参数的 PL SQL 存储过程 但我不关心返回值 我只关心程序是否成功执行 即没有抛出异常 我是否必须在调用 PL SQL 块中定义一个虚拟变量才能接收 out 参数 即使我不想要它 它使我的调用代码变得混乱 是的
  • 运行 Spark 作业时 CPU 使用率低

    我正在运行 Spark 作业 我有 4 个核心 工作内存设置为 5G 应用程序主机位于同一网络中的另一台计算机上 并且不托管任何工作程序 这是我的代码 private void myClass configuration of the sp
  • super(&nil) 在 ruby​​ 中做什么?

    我正在读书并发 ruby 的源代码 https github com ruby concurrency concurrent ruby blob master lib concurrent executor abstract executo
  • 在 Python 中并行处理大型 .csv 文件

    我正在使用 Python 脚本处理大型 CSV 文件 大约有 10M 行的几个 GB 的量级 这些文件具有不同的行长度 并且无法完全加载到内存中进行分析 每一行都由我的脚本中的函数单独处理 分析一个文件大约需要 20 分钟 看来磁盘访问速度