Python并行处理数据多进程/多线程,榨干你的CPU

2023-10-31

前言

  最近在公司实习,给整了个活,像是数学建模一样的数据分析的活,目标是在几个互相有关联的大表中找出满足某条件的那些业务,其中第一步就是把两个表拼起来,就叫它们A和B吧,省略拼表过程中需要的逻辑判断。

串行

  两个长为M和N的表,在判断中需要一个M*N级别的判断,也就是N2的复杂度,当M和N都很大时,比如3w,那还是得花点时间的,比如七八分钟。所以得想办法加速。

使用concurrent.futures的线程池

  既然是在一个py进程里干的活,那自然就想到能不能多开几个线程,比如12个,反正每个判断是独立的,把一个表尽量平均拆成12份,让每个线程去做那一份的判断,最后再把结果返回给主进程进行拼接。这里我先使用了Python的concurrent.futures的线程池来看看效果。
  concurrent.futures 是 Python 的一个模块,它提供了一个高级接口,用于异步执行可调用对象,也可以理解为并发。异步执行可以使用线程(使用 ThreadPoolExecutor)或单独的进程(使用 ProcessPoolExecutor)来执行。两者都实现了相同的接口,由抽象的 Executor 类定义 。

  一个大致的使用框架:

import pandas as pd
import concurrent.futures

def sub_process(data: list) -> list:
	res = []
	# 处理
	return res

df = pd.read_csv('large_data.csv')
num_threads = 12
chunk_size = len(df) // num_threads

results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    for i in range(num_threads):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_threads - 1 else len(df)
        data_chunk = df.iloc[start:end]
        future = executor.submit(sub_process, data_chunk)
        results.append(future)

final_result = []
for f in results:
    final_result.extend(f.result())

使用multiprocessing

  使用了上面这个后,效果并不好,CPU占用就没超过40%,肯定是那里出现了问题,查了一下可能是GIL锁的问题,它是一个互斥锁(mutex),用于防止多个本地线程同时执行 Python 字节码。这个锁主要是因为 CPython 的内存管理不是线程安全的,所以需要这个锁来保证线程安全。
  无奈,只能再尝试其他的,比如multiprocessing,既然不让我的多个线程同时执行,那我开多个进程总行了吧,无非就是多用点内存,多复制几个其他要用到的数据给进程。需要注意的是, 使用这个的话,在循环中启动进程时,没法接收到子进程处理后的结果返回值,所以需要在主进程中创建一个数据结构,让子进程执行完了把结果往里面放,完事儿再让主进程去接收,正好multiprocessing中就有队列的类,可以直接用。推荐进程数和CPU的超线程数一样。试过如果和物理核数一样的话,还是不能拉满CPU,总耗时也比不过设置为线程数。
  很喜欢看着CPU被拉满时任务管理器框框被占满的样子。

from multiprocessing import Process, Queue

def worker(pro_id, data_slice, q):
    result = [sum(data_slice)] * pro_id
    q.put((pro_id, result))

if __name__ == '__main__':
    data = list(range(100))
    num_worker = 12
    slice_size = len(data) // num_worker
    q = Queue()
    pool = []
    for i in range(num_worker):
        start = i * slice_size
        end = (i + 1) * slice_size if i < num_worker - 1 else len(data)
        data_slice = data[start:end]
        p = Process(target=worker, args=(i, data_slice, q))
        p.start()
        pool.append(p)
    result_list = []
    for _ in range(num_worker):
        result_list.append(q.get())
    for p in pool:
        p.join()
    # 如果不在意结果的顺序的话,可以不用排序,队列保存时也可去掉id项。
    result_list.sort(key=lambda x: x[0])
    final_result = [x[1] for x in result_list]
    print(final_result)

没了,主打一个短小精悍,浓缩的就是精华,希望能对你有所帮助

  其实如果要并发的任务只是纯计算的话,用上一个方法更加适合。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python并行处理数据多进程/多线程,榨干你的CPU 的相关文章

  • 如何将经度和纬度转换为国家或城市?

    我需要将经度和纬度坐标转换为国家或城市 python中有这样的例子吗 提前致谢 我使用谷歌的API from urllib2 import urlopen import json def getplace lat lon url http
  • 使用 GeoDjango 在坐标系之间进行转换

    我正在尝试将坐标信息添加到我的数据库中 添加django contrib gis支持我的应用程序 我正在写一个south数据迁移 从数据库中获取地址 并向 Google 询问坐标 到目前为止 我认为我最好的选择是使用geopy为了这 接下来
  • 同情因子简单关系

    我在 sympy 中有一个简单的因式分解问题 无法解决 我在 sympy 处理相当复杂的积分方面取得了巨大成功 但我对一些简单的事情感到困惑 如何得到 phi 2 2 phi phi 0 phi 0 2 8 因式分解 phi phi 0 2
  • 运行源代码中包含 Unicode 字符的 Python 2.7 代码

    我想运行一个在源代码中包含 unicode utf 8 字符的 Python 源文件 我知道这可以通过添加评论来完成 coding utf 8 在一开始的时候 但是 我希望不使用这种方法来做到这一点 我能想到的一种方法是以转义形式编写 un
  • 01 无效令牌[重复]

    这个问题在这里已经有答案了 嘿 学习 python3有一段时间了 遇到字典和dictionary name get 方法并尝试获取随机键值 问题 data data get key 1 它有效并且返回 1 但如果我使用data get ke
  • Python:如何重构循环导入

    我有件事可以帮你做engine setState
  • 使用 Pandas 查找自滚动高点以来的周期数

    我在 Pandas 中使用rolling max函数 http pandas pydata org pandas docs stable computation html moving rolling statistics moments
  • 如何通过facebook-sdk python api获取用户帖子?

    我使用 facebook jssdk 授权我的应用程序读取用户个人资料和用户帖子 FB login function response scope user status user likes user photos user videos
  • 获取 zeep.exceptions.ValidationError:缺少与 suds 一起使用的方法的元素

    我正在移植开发的代码suds 0 6到zeep 2 4 0 以前的泡沫代码 client Client WSDLfile proxy proxy faults True config client factory create perUse
  • keras 预测内存交换无限期增加

    我使用keras实现了一个分类程序 我有一大组图像 我想使用 for 循环来预测每个图像 然而 每次计算新图像时 交换内存都会增加 我尝试删除预测函数内部的所有变量 并且我确信该函数内部存在问题 但内存仍然增加 for img in ima
  • 超时时杀死或终止子进程?

    我想尽可能快地重复执行子进程 然而 有时这个过程会花费太长的时间 所以我想杀死它 我使用 signal signal 如下所示 ppid pipeexe pid signal signal signal SIGALRM stop handl
  • 如何在 Tkinter 的 Button 小部件中创建多个标签?

    我想知道如何在 Tkinter 中创建具有多个标签的按钮小部件 如下图所示 带有子标签的按钮 https i stack imgur com jOZRw jpg正如您所看到的 在某些按钮中有一个子标签 例如按钮 X 有另一个小标签 A 我试
  • Scrapy - 不会爬行

    我正在尝试运行递归爬行 由于我编写的爬行不能正常工作 因此我从网络上提取了一个示例并进行了尝试 我真的不知道问题出在哪里 但是爬行没有显示任何错误 谁能帮我这个 另外 是否有任何逐步调试工具可以帮助理解蜘蛛的爬行流程 非常感谢任何与此相关的
  • 从 subprocess.Popen 获取整个输出

    我通过调用 subprocess Popen 得到了一个有点奇怪的结果 我怀疑这与我对 Python 的陌生有很大关系 args cscript USERPROFILE tools jslint js USERPROFILE tools j
  • 对 pandas 数据框中的每一列应用函数

    我如何以更多的熊猫方式编写以下函数 def calculate df columns mean self df means for column in df columns columns tolist cleaned data self
  • Synapse Notebook 参考 - 使用参数从另一个笔记本调用 Synapse Notebook

    我有一个带有参数的突触笔记本 我试图从另一个笔记本调用该笔记本 我正在使用 run 命令 我应该如何将参数从基本笔记本传递到正在调用的笔记本 另外 对我来说 上述答案不起作用 作为对此问题的单独解决方案 下面是一个答案 打开笔记本并转到最右
  • 升级后 pip 损坏

    我做了 pip install U easyinstall 然后 pip install U pip 来升级我的 pip 但是 当我尝试使用 pip 时 我现在收到此错误 root d8fb98fc3a66 which pip usr lo
  • 在 Gensim 中通过 ID 检索文档的字符串版本

    我正在使用 Gensim 进行一些主题建模 并且已经达到使用 LSI 和 tf idf 模型进行相似性查询的程度 我取回 ID 集和相似点 例如 299501 0 64505910873413086 如何获取与 ID 在本例中为 29950
  • Elastic Beanstalk 上的 Django + MySQL - 查询 MySQL 时出错

    当我在 Elastic beanstalk 上托管的 Django 应用程序上查询 MySQL 时 出现错误 错误说 admin login 处出现操作错误 1045 用户 adminDB 172 30 23 5 的访问被拒绝 使用密码 Y
  • 在 pip 中为 Flask 应用程序构建 docker 映像失败

    from alpine latest RUN apk add no cache python3 dev pip3 install upgrade pip WORKDIR backend COPY backend RUN pip no cac

随机推荐

  • 众享比特董事长严挺:数字藏品在国内有三大发展趋势

    2022年11月2日 巴比特主办的温州元宇宙月系列活动之 数字藏品 虚拟人 元宇宙营销新策略 论坛在温州召开 众享比特董事长严挺 众享链网发起人严挺出席论坛并进行主题为 元宇宙在国内落地的一些实践分享 的演讲 温州元宇宙主题月秉承 拥抱数字
  • Hyperledger fabric2.4 搭建自己的网络

    1 使用cryptogen工具生成证书 1 1 将fabric samples bin目录下的二进制文件复制到 usr local bin目录 以便全局使用这些命令 cd fabric samples bin cp usr local bi
  • 那些你可能遇到的 Linux 命令?什么,你还不知道?赶紧收藏?完善中!

    文章目录 一 Linux 进程 1 通过进程名查找进程号 1 1 ps aux ps ef diff 1 2 ps aux ps aux 什么 它们不一样 1 3 grep awk 取出进程号 取出进程号并 Kill 2 通过进程号查看进程
  • LINUX应用和驱动交互的四种方式

    Linux开发中 应用读取数据时往往会遇到驱动尚未获得有效数据的情况 所以需要采用适合的同步方式 1 非阻塞方式 非阻塞方式 顾名思义就是不管数据是否准备好 驱动都会返回结果 采用这种方式就需要应用不停地重复查询 查询硬件的线程就会一直都占
  • Centos7 升级openssl-1.1.1s及openssh-9.1p1(附脚本)

    主要是上个月openssl出现了漏洞 因此要对服务器的进行升级 建议如果没问题还是尽量别升级 主要步骤是2 脚本内容也只包含升级 1和3是开启和关闭telnet 不建议使用telnet 1 安装和启动telnet 实际中我没使用telnet
  • 数据结构前言

    一 什么是数据结构 数据结构是计算机存储 组织数据的方式 数据结构是指相互之间存在一种或多种特定关系的数据元素的集合 上面是百度百科的定义 通俗的来讲数据结构就是数据元素集合与数据元素集合或者数据元素与数据元素之间的组成形式 举个简单明了的
  • Web Worker 用法

    一 概述 JavaScript 语言采用的是单线程模型 也就是说 所有任务只能在一个线程上完成 一次只能做一件事 前面的任务没做完 后面的任务只能等着 随着电脑计算能力的增强 尤其是多核 CPU 的出现 单线程带来很大的不便 无法充分发挥计
  • DBaaS体系及特性

    用户对云计算的交付能力已经不再满足于单纯的基础设施 IaaS 交付 他们希望数据中心中的更多传统IT 服务能以云服务模式进行交付 其中最为迫切的就是数据库 将数据库以云服务模式交付给用户 就是数据库即服务 DBaaS 也称云数据库 传统数据
  • VirusTotal——您身边的企业安全专家

    本文由 Cloud Ace 整理发布 Cloud Ace 是谷歌云全球战略合作伙伴 拥有 300 多名工程师 也是谷歌最高级别合作伙伴 多次获得 Google Cloud 合作伙伴奖 作为谷歌托管服务商 我们提供谷歌云 谷歌地图 谷歌办公套
  • 基于linux环境下安装jre + eclipse cdt

    博客的排版真的好糟糕 请看点击打开链接 一 下载所用到的软件安装包 1 java运行环境 jre 8u112 linux x64 tar gz 2 elipse cdt版本 eclipse cpp neon 2 linux gtk x86
  • pyahocorasick和pyltp包安装方法

    1 安装pyahocorasick 包 pip install pyahocorasick i Simple Index 这个需要VS环境 如果命令行安装提示没有VS环境可以进入 用VS命令行执行pip命令 即可安装成功 2 安装pyltp
  • Vue中el-dialog的用法

    写入HTML
  • 57 openEuler搭建Mariadb数据库服务器-管理数据库用户

    文章目录 57 openEuler搭建Mariadb数据库服务器 管理数据库用户 57 1 创建用户 57 2 查看用户 57 3 修改用户 57 3 1 修改用户名 57 3 2 修改用户示例 57 3 3 修改用户密码 57 3 4 修
  • 关于javaSE8之后的默认方法的整理

    网络中的说法 关于java8接口中默认方法的使用 8在接口中引入了默认方法 通过在方法前加上default关键字就可以在接口中写方法的默认实现 有点类似于C 中的多继承 但是当多个接口或父类中有相同签名的方法时 会引发一些问题 经过实验得出
  • 射频电路学习之滤波电路

    文章目录 前言 一 滤波电路的分类 二 滤波电路的主要参数 1 插入损耗 IL 2 波纹系数 3 频带宽度 4 矩形系数 5 阻带抑制 三 滤波电路设计 1 集总参数滤波电路 巴特沃斯滤波电路 切比雪夫滤波电路 归一化滤波电路的变换 电路变
  • Topaz Video AI for mac(视频增强和修复工具)

    Topaz Video AI for Mac是一款视频增强和修复工具 采用了人工智能技术 可以提高视频的清晰度 降噪 去抖动和插帧等 这款软件支持多种视频格式 包括MP4 MOV AVI等 使用Topaz Video AI for Mac
  • Python基础—面向对象(超详版)

    Python基础 面向对象 面向对象简介 什么是面向对象 类与对象 父类与子类 面向对象的特性 单继承与多继承 单继承 多继承 多层继承 封装 多态 重写与调用 python重写 python调用 super函数 前言 个人主页 以山河作礼
  • VS 使用System.Console打印时输出窗口不显示

    在项目属性中勾选 启用visual studio承载进程 可以在输出窗口中的调试打印信息中显示System Console打印信息
  • 安装PYG

    目录 1 通过Anaconda安装 2 通过pip安装 3 尝试历程 参考 1 通过Anaconda安装 conda install pyg c pyg c conda forge 2 通过pip安装 首先通过如下命令获取系统torch和C
  • Python并行处理数据多进程/多线程,榨干你的CPU

    前言 最近在公司实习 给整了个活 像是数学建模一样的数据分析的活 目标是在几个互相有关联的大表中找出满足某条件的那些业务 其中第一步就是把两个表拼起来 就叫它们A和B吧 省略拼表过程中需要的逻辑判断 串行 两个长为M和N的表 在判断中需要一