Python - 使用 pandas 多重处理多个大尺寸文件

2023-12-07

我有一个y.csv文件。文件大小为 10 MB,包含来自Jan 2020 to May 2020.

我每个月还有一个单独的文件。例如data-2020-01.csv。它包含详细的数据。每个月文件的文件大小约为1 GB.

我正在分割y.csv按月份,然后通过加载相关月份文件来处理数据。当我去很多个月时,这个过程花费的时间太长。例如24个月。

我想更快地处理数据。我可以访问 AWSm6i.8xlarge实例有32 vCPU and 128 GB memory.

我是多处理新手。那么有人可以在这里指导我吗?

这是我当前的代码。

import pandas as pd

periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB


def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return


for x in periods:
    filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2)  # data-2020-01
    filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])]  # Only get the current month records
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)

    for index, row in filtered_y.iterrows():
        process(month_df, index)

多线程池非常适合共享y线程之间的数据帧(消除了使用共享内存的需要),但不太擅长并行运行 CPU 密集型处理。多处理池非常适合执行 CPU 密集型处理,但在跨进程共享数据而不提供内存碎片表示时效果不佳。y数据框。

在这里,我重新排列了您的代码,以便使用多线程池来创建filtered_y对于每个时期(其中is这是一个 CPU 密集型操作,但 pandas 确实为某些操作释放了全局解释器锁——希望是这个)。然后我们只将一个月的数据传递到多处理池,而不是整个数据ydataframe,使用工作函数处理该月process_month。但由于每个池进程都无权访问ydataframe,它只返回需要用要替换的值更新的索引。

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count

def process_month(period, filtered_y):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def process(period):
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
        y.loc[index, "result"] = value

def main():
    global y, multiprocessing_pool

    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB

    MAX_THREAD_POOL_SIZE = 100
    thread_pool_size = min(MAX_THREAD_POOL_SIZE, len(periods))
    multiprocessing_pool_size = min(thread_pool_size, cpu_count())
    with Pool(multiprocessing_pool_size) as multiprocessing_pool, \
    ThreadPool(thread_pool_size) as thread_pool:
        thread_pool.map(process, periods)
        
    # Presumably y gets written out again as a CSV file here?

# Required for Windows:
if __name__ == '__main__':
    main()

仅使用单个多处理池的版本

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count

def process_month(period):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def main():
    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

    multiprocessing_pool_size = min(len(periods), cpu_count())
    with Pool(multiprocessing_pool_size) as multiprocessing_pool:
        results_list = multiprocessing_pool.map(process_month, periods)
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    for results in results_list:
        for index, value in results:
            y.loc[index, "result"] = value
    # Write out new csv file:
    ...

# Required for Windows:
if __name__ == '__main__':
    main()

现在,它的一个变体使用更多的内存,但允许主进程将其处理与多处理池重叠。如果需要更新的索引数量非常大,这可能会很有用:

...
def main():
    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

    multiprocessing_pool_size = min(len(periods), cpu_count() - 1) # save a core for the main process
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    with Pool(multiprocessing_pool_size) as multiprocessing_pool:
        # Process values as soon as they are returned:
        for results in multiprocessing_pool.imap_unordered(process_month, periods):
            for index, value in results:
                y.loc[index, "result"] = value
    # Write out new csv file:
    ...

最后一个版本可能会更优秀,因为它在将任务提交到池之前首先读取 csv 文件,并且根据平台及其缓存 I/O 操作的方式,可能会导致工作函数不必执行任何物理 I/O 来读取在其文件副本中。但那又是一个10M的文件被读入内存了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python - 使用 pandas 多重处理多个大尺寸文件 的相关文章

  • 使用 Pandas 解析时避免 Excel 的科学记数法舍入

    我有一个自动生成的 Excel 文件 其中偶尔包含非常大的数字 例如135061808695 在 Excel 文件中 当您单击单元格时 它会显示完整的数字135061808695然而 在视觉上 使用自动 常规 格式 数字显示为1 35063
  • 从日志文件 python 创建 csv 标题

    我的日志文件每行都包含一些信息 如下所示 Info1 NewOrder key 123 Info3 10 Info5 abc Info3 10 Info1 OldOrder key 456 Info6 xyz Info1 NewOrder
  • python subprocess proc.stderr.read() 引入额外的行?

    我想运行一些命令并抓取输出到 stderr 的任何内容 我有两个版本的函数可以执行此操作 版本 1 def Getstatusoutput cmd Return status output of executing cmd in a she
  • QFileDialog 作为 TableView 的编辑器:如何获取结果?

    我正在使用一个QFileDialog作为某些专栏的编辑QTableView 这基本上有效 对一些焦点问题取模 请参阅here https stackoverflow com questions 22854242 qfiledialog as
  • NumPy:linalg.eig() 和 linalg.eigh() 之间的区别

    在 Python 3 应用程序中 我使用 NumPy 来计算对称实矩阵的特征值和特征向量 这是我的演示代码 import numpy as np a np random rand 3 3 generate a random array sh
  • 使用 Click 在 python 中创建命令行应用程序

    我正在使用 Python 创建一个命令行应用程序Click http click pocoo org 接受名称作为输入的库 但如果未输入名称 则返回默认值 这是我到目前为止的代码 hello py import click click ve
  • 如何在 pygame 中水平翻转图像?

    这是在 pygame 如何翻转图像 假设一个图像 猪向右看 时向左看 我按向左箭头键 然后保持这样 即使我不按任何键或者按向上和向下箭头键 那么 当我按向右箭头键时 如何再次将其切换回向右看 并使其保持这种状态 即使我不按任何键或按向上和向
  • 如何从包含许多表的 Excel 工作表中解析数据帧(使用 Python,可能使用 Pandas)

    我正在处理布局糟糕的 Excel 工作表 我正在尝试解析这些工作表并将其写入数据库 每个工作表可以有多个表 尽管这些可能的表格的标题是已知的 但哪些表格将位于任何给定的工作表上 它们在工作表上的确切位置也不是已知的 表格不以一致的方式对齐
  • 如何在Python中将字符串转换为包含一个元素的列表[重复]

    这个问题在这里已经有答案了 我有一个字符串 我想将其转换为其中只有一个元素的列表 a abc print list a output a b c Expected o p abc 正确的做法是什么 只需使用 a abc b a print
  • 在此异步设置中,我在哪里捕获 KeyboardInterrupt 异常

    我正在开发一个使用ccxt异步库 它要求通过显式调用该类的资源来释放某个类使用的所有资源 close 协程 我想退出程序ctrl c并等待异常中的关闭协程 然而 它永远不会被等待 该应用程序由模块组成harvesters strategie
  • 使用 South 更改 Django 模型列默认值

    我在 Django 项目中使用 South 和 Postgresql DB 我想更改一个模型字段的默认值以供继续使用 我不需要以前的记录 刚刚新记录 我是否需要为此进行迁移 或者只是更改模型 旧场详细信息 background style
  • Python,socket.error:[Errno 10049]

    在开发一个简单的聊天客户端的基础上 遇到以下错误 socket error Errno 10049 The requested address is not valid in its context 代码是 from socket impo
  • 在硬件级别模拟按键 - Windows

    我正在寻找一种语言或库 使我能够在最大可能的水平上模拟击键 而无需实际按下按键 我对击键级别的具体衡量标准是 当我的计算机已经运行按键侦听器 例如鼠标键和粘滞键 时 它是否会产生与物理按键相同的输出 我尝试过很多击键模拟的方法 java A
  • 我们可以限制 luigi 任务的吞吐量吗?

    我们有一个 Luigi 任务 它向第三方服务请求一条信息 我们对该 API 调用每分钟可以执行的调用请求数量受到限制 有没有办法在每个任务的基础上指定调度程序每单位时间必须运行多少个此类任务 我们在任务中实施了自己的速率限制 我们的 API
  • 发送fulfillmentText并使用followupEventInput转移到另一个意图

    我使用 Python Flask 设置了一个简单的 Webhook 来处理各种 Dialogflow 功能 在这一点上一切都进展顺利 该机器人通过 DialogFlow API V2 集成到 Facebook Messenger 问题是 关
  • 使用 scipy.io 将 python pandas dataframe 转换为 matlab 结构

    我正在尝试使用 scipy io 将 pandas 数据帧保存到 matlab mat 文件 我有以下内容 array1 np array 1 2 3 array2 np array a b c array3 np array 1 01 2
  • 如何使用 google.oauth2 python 库?

    我试图对谷歌机器学习项目的安全预测端点进行简单的休息调用 但它找不到 google oauth2 模块 这是我的代码 import urllib2 from google oauth2 import service account Cons
  • Python 和 Visual Studio Code - 如何在编辑器中运行特定文件?

    我正在使用 Visual Studio Code 和 Python 编写一个小型应用程序 我的应用程序有两个文件 Main py and MyCustomClass py Main py是应用程序的入口点 MyCustomClass py包
  • 在python中读取证书(.crt)和密钥(.key)文件

    因此 我使用 JIRA Python 模块连接到我公司的 JIRA 实例 它要求我为此传递证书和密钥 但是 使用 OpenSSL 模块 我无法读取本地证书和密钥来将其传递给请求 阅读代码如下 import OpenSSL crypto c
  • Python FFmpeg查询rtsp太慢

    目前 我正在尝试使用 python 和 FFmpeg 来查询原始格式为 h264 的 rtsp 数据 直播流视频的信息为 fps 29 分辨率 1280 720 我希望我可以以相同的格式 h264 查询数据并将其放入python队列中以便将

随机推荐