如何并行化行式 Pandas 数据帧的 apply() 方法

2024-02-24

我有以下代码:

import pandas as pd
import time

def enrich_str(str):
        
    val1 = f'{str}_1'
    val2 = f'{str}_2'
    val3 = f'{str}_3'
    time.sleep(3)
    
    return val1, val2, val3
    
def enrich_row(passed_row):
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row


df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

tic = time.perf_counter()
enriched_df = df.apply(enrich_row, col_name='colors', axis=1)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")

enriched_df

需要 15 秒才能获得输出数据帧,如下所示:

现在我想在我的机器上使用多个线程并行化丰富操作。 我探索了很多解决方案,比如Dask, numba,但对我来说,它们似乎都不简单。

然后我偶然发现了multiprocessing图书馆及其pool.imaps()方法。所以我尝试运行以下代码:

import multiprocessing as mp

tic = time.perf_counter()
pool = mp.Pool(5)
result = pool.imap(enrich_row, df.itertuples(), chunksize=1)
pool.close()
pool.join()
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
result

大约需要 2 秒result不是 Pandas 数据框。 我不知道我哪里错了。


我建议您使用悲情叉 https://pypi.org/project/pathos/ of multiprocessing,因为它将更好地处理 DataFrame 的酸洗。imap返回一个迭代器,而不是 DataFrame,因此您必须将其转换回来:

def enrich_row(row_tuple):
    passed_row = row_tuple[1]
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row

df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

from pathos.multiprocessing import Pool

tic = time.perf_counter()
result = Pool(8).imap(enrich_row, df.iterrows(), chunksize=1)
df = pd.DataFrame(result)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
print(df)

请注意,我正在使用df.iterrows()它返回元组的迭代器(row_number, row),所以我修改了enrich_row来处理这种格式。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何并行化行式 Pandas 数据帧的 apply() 方法 的相关文章

随机推荐

  • 使用信号量实现 N 进程屏障

    我目前正在为之前迭代的操作系统考试进行培训 我遇到了这个 实施 N 进程屏障 即 是 确保每个流程都完成 他们中的一群人在某个时刻等待着 点在其各自的执行中 对于 其他进程达到他们的 给定点 您有以下内容 可用操作 init sem val
  • 删除核心数据中的重复对象(swift)

    我将对象保存到 JSON 中的核心数据 这是我使用for循环 假设我称之为setup功能 由于用户可能会停止此循环 因此核心数据中保存的对象将是部分的 用户可以重新启动此setup函数 重新启动解析和将对象保存到核心数据的过程 现在 如果我
  • 使用 Linux sort 命令对多个键进行排序

    说我有这个文件 cat a txt c 1002 4 f 1001 1 d 1003 1 a 1001 3 e 1004 2 b 1001 2 我想按第二列排序 然后按第三列排序 第二列是数字 而第三列可以视为字符串 我知道以下命令效果很好
  • 单击任意位置以在 CKEditor 中聚焦

    在 FireFox 中 我可以单击 CKEditor 350px x 250px 中的任意位置 将焦点放在编辑器顶部的单个文本段落上 然而 在 IE6 中 我知道 但我们的客户坚持 我必须直接单击段落顶部以将光标聚焦并随后编辑文本 CKEd
  • STD 集合中引用的生命周期

    对 STD 集合 例如映射 返回的元素的引用有效多久 例如 在这段代码中 struct Employee int salary string name the key map
  • JavaTypeDescriptorRegistry - 找不到所请求的 Java 类的匹配类型描述符

    我有一个项目运行没有任何问题 除了这个警告消息 WARN org hibernate type descriptor java JavaTypeDescriptorRegistry Could not find matching type
  • 从第三方将CSS注入到iframe中

    我们可以将一堆 CSS 文件从第三方 例如托管广告的 OAS 注入到 iframe 中吗 如果可能的话 我们将不胜感激 通过使用 jQuery 选择器 您应该能够做到这一点 但是 对 iframe 内容不应有任何限制 即它应该来自同一域 对
  • 使用 StreamReader 检查文件是否包含字符串

    我有一个字符串是args 0 到目前为止 这是我的代码 static void Main string args string latestversion args 0 create reader open file using Strea
  • 如何从无限字节流中读取 UTF-8 字符 - C#

    通常 要从字节流中读取字符 您可以使用 StreamReader 在此示例中 我从无限流中读取由 r 分隔的记录 using var reader new StreamReader stream Encoding UTF8 var mess
  • 如何修复 android Adob​​e SDK 工具中的此错误?

    我已将 Adob e Editor 集成到我的 Android 应用程序中 它工作正常 更新我的 Android Studio 后 它崩溃了 我在gradle中添加了 android compileSdkVersion 26 buildTo
  • 如何释放 boost::mpi::request?

    我正在尝试让 MPI 断开通信器 这是一件很棘手的事情 我在下面整理了一个演示 我有相同想法的两个版本 侦听 int 一个使用 MPI IRecv 另一个使用 boost mpi request 您会注意到 在此程序上使用 mpiexec
  • 单击单元格时的操作

    H 我是 VBA 新手 这可能是一个太简单的问题 但我正在努力使用 VBA 当单元格 1 1 被点击时 因为它有1 消息框会显示 hi Sub test click action when cell 1 1 is clicked and i
  • 如何动态更改黑莓标签字段的字体颜色?

    我有一个标签字段和三个按钮 名称分别为红色 黄色 蓝色 如果我单击红色按钮 则标签字段字体颜色应更改为红色 同样 如果我单击黄色按钮 则字体颜色应更改为黄色 同样 根据按钮颜色 标签字段中的字体颜色应发生变化 谁能告诉我该怎么做 Label
  • Laravel Session 检测到一个域、多个数据库

    我读过几篇文章 主题 例如this https stackoverflow com questions 31847054 how to use multiple databases in laravel this https medium
  • Git 存储库太大

    我有一个项目 其中包含大约 12MB 的代码和资产 我一直在使用 Git 跟踪它 并且刚刚注意到我的 git文件夹现在刚刚超过 1 83GB 它由几个小文件组成 然后是一个包文件 约占该文件夹的 1 82GB 我已经跑了git gc agg
  • 检测android中home按钮的点击事件(应用程序启动器图标)

    如何识别android中应用程序启动器图标中的点击事件 一旦用户单击此图标 我需要转到主屏幕 例如 假设这是清单文件
  • WPF 中的图像可见性问题 - 按下按钮时不显示

    我正在用 C 开发一个 WPF 应用程序 其中有一个按钮可以切换图像的可见性 我已按照说明进行操作并实现了以下代码来处理按钮单击 XAML
  • 保存到服务器后图像质量下降。

    我正在捕获图像 并将其保存到服务器路径中 它工作正常 捕获的图像看起来质量不错 但将图像保存到服务器后 其质量下降 这是我的代码 这是我的活动 import java io BufferedReader import java io Byt
  • 无法在 android studio 的模拟器中启动 AVD。参数无效

    我在 Android Studio 2 1 2 中遇到模拟器问题 当我尝试启动 AVD 时 我收到一条消息 无法在模拟器中启动 AVD Output 哈克斯已启用 该虚拟机所需的内存超出了驱动程序限制 Hax ram size 0x6000
  • 如何并行化行式 Pandas 数据帧的 apply() 方法

    我有以下代码 import pandas as pd import time def enrich str str val1 f str 1 val2 f str 2 val3 f str 3 time sleep 3 return val