多处理写入 pandas 数据框

2023-11-27

所以我试图用下面的代码做的是读取列表列表并将它们放入名为的函数中checker然后有log_result处理函数的结果checker。我尝试使用多线程来执行此操作,因为变量名称rows_to_parse实际上有数百万行,因此使用多个内核应该会大大加快此过程。

目前的代码无法运行并且会使 Python 崩溃。

我的担忧和问题:

  1. 想要变量中保存的现有 dfdf以维持 索引整个过程,否则log_result将会得到 不知道哪一行需要更新。
  2. 我非常确定apply_async不合适 多处理功能来履行这项职责,因为我相信 计算机读写 df 的顺序可能会损坏它???
  3. 我认为可能需要设置一个队列来进行写入和读取df但我不确定我将如何去做。

感谢您的帮助。

import pandas as pd
import multiprocessing
from functools import partial

def checker(a,b,c,d,e):
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
    index_of_match = match.index.tolist()
    if len(index_of_match) == 1: #one match in df
        return index_of_match
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
        return [index_of_match[0]]
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
        return [a,b,c,d,e]



def log_result(result, dataf):
    if len(result) == 1: #
        dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df
        new_row = pd.DataFrame([result],columns=cols)
        dataf = dataf.append(new_row,ignore_index=True)


def apply_async_with_callback(parsing_material, dfr):
    pool = multiprocessing.Pool()
    for var_a, var_b, var_c, var_d, var_e in parsing_material:
        pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
    pool.close()
    pool.join()



if __name__ == '__main__':
    #setting up main dataframe
    cols = ['a','b','c','d','e']
    existing_data = [["YES","A","16052011","13031999",3],
                    ["NO","Q","11022003","15081999",3],
                    ["YES","A","22082010","03012001",9]]

    #main dataframe
    df = pd.DataFrame(existing_data,columns=cols)

    #new data
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
                    ['YES', 'W', '17061992', '26032012', 6],
                    ['YES', 'G', '01122006', '07082014', 2],
                    ['YES', 'N', '06081992', '21052008', 9],
                    ['YES', 'Y', '18051995', '24011996', 6],
                    ['NO', 'Q', '11022003', '15081999', 3],
                    ['NO', 'O', '20112004', '28062008', 0],
                    ['YES', 'R', '10071994', '03091996', 8],
                    ['NO', 'C', '09091998', '22051992', 1],
                    ['YES', 'Q', '01051995', '02012000', 3],
                    ['YES', 'Q', '26022015', '26092007', 5],
                    ['NO', 'F', '15072002', '17062001', 8],
                    ['YES', 'I', '24092006', '03112003', 2],
                    ['YES', 'A', '22082010', '03012001', 9],
                    ['YES', 'I', '15072016', '30092005', 7],
                    ['YES', 'Y', '08111999', '02022006', 3],
                    ['NO', 'V', '04012016', '10061996', 1],
                    ['NO', 'I', '21012003', '11022001', 6],
                    ['NO', 'P', '06041992', '30111993', 6],
                    ['NO', 'W', '30081992', '02012016', 6]]


    apply_async_with_callback(rows_to_parse, df)

在多处理中像这样更新数据帧是行不通的:

dataf = dataf.append(new_row,ignore_index=True)

一方面,这是非常低效的(每次追加都是 O(n),所以总共是 O(n^2)。首选方法是在一次传递中将一些对象连接在一起。

另一方面,更重要的是,dataf 不会为每次更新锁定,因此不能保证两个操作不会发生冲突(我猜这会导致 python 崩溃)。

最后,append没有作用到位,所以变量dataf一旦回调完成就被丢弃!并且没有对父级进行任何更改dataf.


我们可以使用多重处理列表 or a dict。如果您不关心顺序,则使用 list ;如果您关心顺序,则使用 dict (例如枚举),因为您必须注意,异步返回的值不是按照明确定义的顺序返回的。
(或者我们可以创建一个自己实现 Lock 的对象,请参阅伊莱·班德斯基.)
因此进行了以下更改:

df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])

dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)

现在,一旦异步完成,您就拥有了 DataFrame 的 MultiProcessing.list。您可以连接这些(和ignore_index)以获得所需的结果:

pd.concat(d, ignore_index=True)

应该做到这一点。


注意:在每个阶段创建 newrow DataFrame 的效率也低于让 pandas 一次性将列表列表直接解析为 DataFrame 的效率。希望这是一个玩具示例,实际上您希望您的块相当大,以便通过多重处理获得胜利(我听说 50kb 作为经验法则......),一次一行永远不会成为一个在这里获胜。


另外:您应该避免在代码中使用全局变量(如 df),在函数中传递它们会更干净(在本例中,作为检查器的参数)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多处理写入 pandas 数据框 的相关文章

  • 如何将条目中的部分文本加粗并更改其背景颜色?

    我正在创建一个基于 Tkinter 的 GUI 它有一个 Entry 小部件 我想将其文本的一部分加粗并更改其背景颜色 但我不知道我该怎么做 如果我使用文本小部件 我可以只使用标签 但看起来它们不能与条目小部件一起使用 此代码使用文本小部件
  • NLTK、搭配问题:需要解包的值太多(预期为 2)

    我尝试使用 NLTK 检索搭配 但出现错误 我使用内置的古腾堡语料库 I wrote alice nltk corpus gutenberg fileids 7 al nltk corpus gutenberg words alice al
  • Python 中 genfromtxt() 的可变列数?

    我有一个 txt具有不同长度的行的文件 每一行都是代表一条轨迹的一系列点 由于每条轨迹都有自己的长度 因此各行的长度都不同 也就是说 列数从一行到另一行不同 据我所知 genfromtxt Python 中的模块要求列数相同 gt gt g
  • 使用正则表达式解析 Snort 警报文件

    我正在尝试使用 Python 中的正则表达式从 snort 警报文件中解析出源 目标 IP 和端口 和时间戳 示例如下 03 09 14 10 43 323717 1 2008015 9 ET MALWARE User Agent Win9
  • 如何使用 openpyxl 对工作簿中的 Excel 工作表/选项卡进行排序

    我需要按字母数字对工作簿中的选项卡 工作表进行排序 我在用openpyxl https openpyxl readthedocs io en default 操作工作表 您可以尝试排序workbook sheets list workboo
  • VSCode pytest 测试发现失败

    Pytest 测试发现失败 用户界面指出 Test discovery error please check the configuration settings for the tests 输出窗口显示 Test Discovery fa
  • 如何为多组精灵创建随机位置?

    我尝试使用 blit 和 draw 方法进行 for 循环 并为 PlayerSprite 和 Treegroup 使用不同的变量 for PlayerSprite in Treegroup surface blit PlayerSprit
  • 使用 dict 在数据框中查找行

    df pd DataFrame a 1 2 3 b 4 5 6 produces a b 0 1 4 1 2 5 2 3 6 给定一个字典 d a 2 b 5 我将如何提取数据帧中字典的键值与所有列值匹配的行 所以在这种情况下 a b 1
  • 多线程——更快的方法?

    我有一堂有吸气剂的课程getInt 和一个二传手setInt 在某个领域 比如说领域 Integer Int 一个类的一个对象 比如说SomeClass The setInt 这里是同步的 getInt isn t 我正在更新的值Int来自
  • FastText - 由于 C++ 扩展未能分配内存,无法加载 model.bin

    我正在尝试使用 FastText Python APIhttps pypi python org pypi fasttext https pypi python org pypi fasttext虽然 据我所知 此 API 无法加载较新的
  • 在 pip.conf 中指定多个可信主机

    这是我尝试在我的中设置的 etc pip conf global trusted host pypi org files pythonhosted org 但是 它无法正常工作 参考 https pip pypa io en stable
  • 从BackgroundWorker线程更新图像UI属性

    在我正在编写的 WPF 应用程序中 我有一个 TransformedBitmap 属性 该属性绑定到 UI 上的 Image 对象 每当我更改此属性时 图像就会更新 因此显示在屏幕上的图像也会更新 为了防止在检索下一张图像时 UI 冻结或变
  • 是否可以写一个负的python类型注释

    这可能听起来不合理 但现在我需要否定类型注释 我的意思是这样的 an int Not Iterable a string Iterable 这是因为我为一个函数编写了一个重载 而 mypy 不理解我 我的功能看起来像这样 overload
  • Google App Engine 中的自定义身份验证

    有谁知道或知道我可以在哪里学习如何使用 Python 和 Google App Engine 创建自定义身份验证流程 我不想使用 Google 帐户进行身份验证 并且希望能够创建自己的用户 如果不是专门针对 Google App Engin
  • PyQt 中的线程和信号问题

    我在 PyQt 中的线程之间进行通信时遇到一些问题 我使用信号在两个线程 发送者和监听者 之间进行通信 发送者发送消息 期望被监听者接收 但是 没有收到任何消息 谁能建议可能出了什么问题 我确信这一定很简单 但我已经环顾了几个小时但没有发现
  • 具有指定置信区间的 Seaborn 条形图

    我想在 Seaborn 条形图上绘制置信区间 但我已经计算出置信区间 如何让 Seaborn 绘制我的置信区间而不是尝试自行计算它们 例如 假设我有以下 pandas DataFrame x pd DataFrame Group 1 0 5
  • 使用“pythonw”(而不是“python”)运行应用程序时找不到模块

    我尝试了这个最小的例子 from flask import Flask app Flask name app route def hello world return Hello World if name main app run deb
  • 如何使用 Django (Python) 登录表单?

    我在 Django 中构建了一个登录表单 现在我遇到了路由问题 当我选择登录按钮时 表单不会发送正确的遮阳篷 我认为前端的表单无法从 查看 py 文件 所以它不会发送任何 awnser 并且登录过程无法工作 该表单是一个简单的静态 html
  • 如何在SqlAlchemy中执行“左外连接”

    我需要执行这个查询 select field11 field12 from Table 1 t1 left outer join Table 2 t2 ON t2 tbl1 id t1 tbl1 id where t2 tbl2 id is
  • 使用 numpy 加速 for 循环

    下一个 for 循环如何使用 numpy 获得加速 我想这里可以使用一些奇特的索引技巧 但我不知道是哪一个 这里可以使用 einsum 吗 a 0 for i in range len b a numpy mean C d e f b i

随机推荐

  • mysql group by 返回最小值并获取对应行数据

    我有一个像这样的数据表 PK table merchantName price Product 1 argos 7 4 2 comet 3 4 1 Dixon 1 3 1 argos 10 4 我希望在mysql中选择产品的最低价格和相应的
  • 将 ajax 结果附加到 div

    我正在对 IMDb API 进行 ajax 调用 以获取 肖申克的救赎 的电影数据 我希望将这些数据放入我创建的 div 中 div div 我当前的js代码 init function init ajax dataType json ur
  • 如何测量Java线程的执行时间?

    我想测量Java中线程的执行时间 现在我正在监视线程的开始和结束时间 但我认为它不太准确 因为线程在执行期间可能会被挂起 Java MXBeans 可以提供每线程 CPU 时间 import java lang management Man
  • “@+android:id/title”是什么意思?

    正常情况下 我们应该使用 id 定义一个 id 并使用 id引用一个 id 今天我发现 android id title in apps settings res layout preferenc progress xml 如何理解它以及如
  • 比较相等的日期时间返回 false

    我有一个关于如何在 C 中比较 存储日期时间的查询 考虑以下代码 var createdDate DateTime Now using cr new LanguageDictionaryRepository ds cr Add new Sy
  • 在 Rcpp 中构造 3D 数组

    我正在尝试使用提供的维度列表将 1D 数组映射到 3D 数组 这是我的组件 SEXP data my 1D array I can initialise new 3D vector in the following way NumericV
  • Angular ngx-mat-select-search 自定义组件

    我正在尝试使用 ngx mat select search 组件在我的应用程序中放置一个带有搜索栏的 mat select 样式下拉菜单 https www npmjs com package ngx mat select search 我
  • 如何在 Liferay portlet 中设置 Cookie?

    我在尝试设置会话 cookie 时遇到问题Liferay 6 0 portlet 我希望能够向客户端浏览器设置一个 cookie 以存储用于 linkedin 身份验证的应用程序密钥 然后其他 portlet 可以在其中检索它 我可以使用以
  • 如何在 ReadTheDocs 导航栏中链接生成的索引页面?

    我正在 ReadTheDocs 上使用 Sphinx 主题创建我的文档 构建过程会生成一个 genindex html 文件 可以通过以下方式引用该文件 Link to the ref genindex page 这会创建 链接到Index
  • 了解 Dagger 2 中的范围

    我在 Dagger 2 中遇到了与范围相关的错误 我正在尝试了解如何解决它 我有一个CompaniesActivity这表明公司 当用户选择一个项目时 所选公司的员工会显示在EmployeesActivity 当用户选择一名员工时 她的详细
  • 如何在 yii2 模态窗口中使用 pjax 更新小部件

    我的模态窗口中有两个 ActiveForm 提交第一个表单后 我需要更新第二个表单并保持模态 据我了解 pjax 可以处理这个问题 但无法让它正常工作 在 form php 中 我有 ActiveForm 和应该更新的小部件 need to
  • 使用 PdfMiner 和 PyPDF2 合并列提取文本

    我正在尝试使用 pdfMiner 解析 pdf 文件文本 但提取的文本被合并 我正在使用以下链接中的 pdf 文件 编辑 链接已损坏 指向潜在的恶意软件 我擅长任何类型的输出 文件 字符串 这是为我返回提取的文本作为字符串的代码 但由于某种
  • 合并两个 jQuery 对象

    我喜欢有一个或多个 jQuery 对象的返回值 以将它们添加到 var f function var a div class a div do someting awesome return a 带有两个 s 的正确示例 var f fun
  • Rails:有没有办法告诉请求的来源?

    谷歌分析告诉我流量的来源是什么 搜索引擎 Facebook 等 有没有办法从每个请求获取此信息request对象或其他 我将不胜感激任何帮助 request referrer会给你HTTP 引荐来源网址 value
  • Azure Web应用程序:堆栈设置

    I can set my stack for a webapp through the portal 我通过 ARM 模板部署基础设施 apiVersion 2015 08 01 type Microsoft Web sites name
  • 如何清除表单中所有文本框的文本?

    private void CleanForm foreach var c in this Controls if c is TextBox TextBox c Text String Empty 上面这个方法不起作用 控件没有被清除 它编译
  • .Net WebDAV 服务器

    我正在寻找在 ASP Net 中实现 WebDAV 服务器 该应用程序将部署到 IIS 6 我见过一些提供此功能的框架 但我似乎无法确定它们如何在不 显然 修改 IIS 设置的情况下完成此功能 我的具体问题是如何配置 IIS 和 ASP N
  • 设置土耳其语文本转语音[重复]

    这个问题在这里已经有答案了 我正在开发文本到语音应用程序 我想将土耳其语设置为这样 tts setLanguage Locale TR 但这在android中不可用 这种添加方式是错误的还是有不同的方法将土耳其语添加到文本到语音中 任何帮助
  • 如何在多项目 Android 构建中排除重复的 C 共享库 (.so)?

    当使用两个库模块构建父项目时 我遇到 重复文件 冲突 这两个模块使用相同的库模块libc shared so共享库 NOTE 请不要认为这是一个 重复的问题 我读过几篇相关的文章 它们帮助我走到了这一步 但是 没有帖子提供适用于我的案例的答
  • 多处理写入 pandas 数据框

    所以我试图用下面的代码做的是读取列表列表并将它们放入名为的函数中checker然后有log result处理函数的结果checker 我尝试使用多线程来执行此操作 因为变量名称rows to parse实际上有数百万行 因此使用多个内核应该