一文讲透Python线程池ThreadPoolExecutor

2023-12-04

01 初识

Python 中已经有了 threading 模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是 20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的 ,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行。

这就是线程池的思想(当然没这么简单),但是自己编写线程池很难写的比较完美,还需要考虑复杂情况下的线程同步,很容易发生死锁。从 Python3.2 开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor ProcessPoolExecutor 两个类,实现了对 threading multiprocessing 进一步抽象(这里主要关注线程池),不仅可以帮我们 自动调度线程 ,还可以做到:

  • 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。

  • 当一个线程完成的时候,主线程能够立即知道。

  • 让多线程和多进程的编码接口一致。

02 实例

简单使用

from concurrent.futures import ThreadPoolExecutor

import time

# 参数times用来模拟网络请求的时间

def get_html(times):

   time.sleep(times)
   print("get page {}s finished".format(times))
   return times

executor = ThreadPoolExecutor(max_workers=2)

# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞

task1 = executor.submit(get_html, (3))

task2 = executor.submit(get_html, (2))

# done方法用于判定某个任务是否完成

print(task1.done())

# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功

print(task2.cancel())time.sleep(4)print(task1.done())

# result方法可以获取task的执行结果

print(task1.result())

# 执行结果# False  

# 表明task1未执行完成# False  

# 表明task2取消失败,因为已经放入了线程池中

# get page 2s finished# get page 3s finished# True  

# 由于在get page 3s finished之后才打印,所以此时task1必然完成了

# 3     

# 得到task1的任务返回值
  • ThreadPoolExecutor 构造实例的时候,传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。

  • 使用 submit 函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

  • 通过 submit 函数返回的任务句柄,能够使用 done() 方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在 task1 提交后立刻判断, task1 还未完成,而在延时4s之后判断, task1 就完成了。

  • 使用 cancel() 方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是 task1 task2 还在排队等候,这是时候就可以成功取消。

  • 使用 result() 方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用 as_completed 方法一次取出所有任务的结果。

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

# 参数times用来模拟网络请求的时间

def get_html(times):

   time.sleep(times)
   print("get page {}s finished".format(times))
   return times

executor = ThreadPoolExecutor(max_workers=2)

urls = [3, 2, 4] # 并不是真的url

all_task = [executor.submit(get_html, (url)) for url in urls]



for future in as_completed(all_task):

   data = future.result()

   print("in main: get page {}s success".format(data))#

 执行结果

# get page 2s finished

# in main: get page 2s success

# get page 3s finished

# in main: get page 3s success

# get page 4s finished

# in main: get page 4s success

as_completed() 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会 yield 这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出, 先完成的任务会先通知主线程

map

除了上面的 as_completed 方法,还可以使用 executor.map 方法,但是有一点不同。

from concurrent.futures import ThreadPoolExecutor

import time



# 参数times用来模拟网络请求的时间def get_html(times):

   time.sleep(times)
   print("get page {}s finished".format(times))
   return times

executor = ThreadPoolExecutor(max_workers=2)

urls = [3, 2, 4] # 并不是真的url



for data in executor.map(get_html, urls):

   print("in main: get page {}s success".format(data))

# 执行结果

# get page 2s finished

# get page 3s finished

# in main: get page 3s success

# in main: get page 2s success

# get page 4s finished

# in main: get page 4s success

使用 map 方法,无需提前使用 submit 方法, map 方法与 python 标准库中的 map 含义相同,都是将序列中的每个元素都执行同一个函数。上面的代码就是对 urls 的每个元素都执行 get_html 函数,并分配各线程池。可以看到执行结果与上面的 as_completed 方法的结果不同, 输出顺序和 urls 列表的顺序相同 ,就算2s的任务先执行完成,也会先打印出3s的任务先完成,再打印2s的任务完成。

wait

wait 方法可以让主线程阻塞,直到满足设定的要求。

​from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETEDimport time

# 参数times用来模拟网络请求的时间

def get_html(times):

    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)

urls = [3, 2, 4] # 并不是真的url

all_task = [executor.submit(get_html, (url)) for url in urls]

wait(all_task, return_when=ALL_COMPLETED)print("main")

# 执行结果 

# get page 2s finished

# get page 3s finished

# get page 4s finished

# main

​

wait 方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件 return_when 默认为 ALL_COMPLETED ,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出 main 。等待条件还可以设置为 FIRST_COMPLETED ,表示第一个任务完成就停止等待。

03 源码分析

cocurrent.future 模块中的 future 的意思是 未来对象 ,可以把它理解为 一个在未来完成的操作 ,这是异步编程的基础 。在线程池 submit() 之后,返回的就是这个 future 对象,返回的时候任务并没有完成,但会在将来完成。也可以称之为task的返回容器,这个里面会存储task的结果和状态。那 ThreadPoolExecutor 内部是如何操作这个对象的呢?

下面简单介绍 ThreadPoolExecutor 的部分代码:

1.init方法

图片

init 方法中主要重要的就是任务队列和线程集合,在其他方法中需要使用到。

2.submit方法

图片

submit 中有两个重要的对象, _base.Future() _WorkItem() 对象, _WorkItem() 对象负责运行任务和对 future 象进行设置,最后会将 future 对象返回,可以看到整个过程是立即返回的,没有阻塞。

3.adjust_thread_count方法

图片

这个方法的含义很好理解,主要是创建指定的线程数。但是实现上有点难以理解,比如线程执行函数中的weakref.ref,涉及到了弱引用等概念,留待以后理解。

4._WorkItem对象

图片

_WorkItem 对象的职责就是执行任务和设置结果。这里面主要复杂的还是 self.future.set_result(result)

5.线程执行函数--_worker

图片

这是线程池创建线程时指定的函数入口,主要是从队列中依次取出task执行,但是函数的第一个参数还不是很明白。留待以后。

04 总结

  • future的设计理念很棒,在线程池/进程池和携程中都存在future对象,是异步编程的核心。

  • ThreadPoolExecutor 让线程的使用更加方便,减小了线程创建/销毁的资源损耗,无需考虑线程间的复杂同步,方便主线程与子线程的交互。

  • 线程池的抽象程度很高,多线程和多进程的编码接口一致。

未完成

  • 对future模块的理解。

  • weakref.ref是什么?

  • 线程执行函数入口_worker的第一个参数的意思。

行动吧,在路上总比一直观望的要好,未来的你肯定会感谢现在拼搏的自己!如果想学习提升找不到资料,没人答疑解惑时, 请及时加入群: 786229024 ,里面有各种测试开发资料和技术可以一起交流哦。

最后: 下方这份完整的软件测试视频教程已经整理上传完成,需要的朋友们可以自行领取 【保证100%免费】 在这里插入图片描述
软件测试面试文档
我们学习必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有字节大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。 在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一文讲透Python线程池ThreadPoolExecutor 的相关文章

  • Windows 中的 Python 多处理池奇怪行为

    Python 多处理池在 Linux 和 Windows 之间有不同的行为 当按工作人员数量运行方法映射时 在 Linux 中 它会在您作为参数提供的特定函数的范围内运行该进程 但在 Windows 中 每个工作进程都在父进程的范围内运行
  • 从框架中获取可调用对象

    给定框架对象 由sys getframe http docs python org library sys html sys getframe 例如 我可以获得底层的可调用对象吗 代码解释 def foo frame sys getfram
  • setColumnStretch 和 setRowStretch 如何工作

    我有一个使用构建的应用程序PySide2它使用setColumnStretch用于柱拉伸和setRowStretch用于行拉伸 它工作得很好 但我无法理解它是如何工作的 我参考了 qt 文档 但它对我没有帮助 我被困在括号内的两个值上 例如
  • 使用 pyppeteer 与 asyncio 关联来抓取内容

    我用 python 结合编写了一个脚本pyppeteer随着asyncio从其登陆页面抓取不同帖子的链接 并最终通过跟踪通向其内页的 url 来获取每个帖子的标题 我这里解析的内容不是动态的 但是 我利用了pyppeteer and asy
  • 如何使用 tkinter 使用网格功能显示不同的图像?

    我想使用显示文件夹中的图像grid 但是当我尝试使用以下代码时 我得到了迭代单个图像的输出 My code def messageWindow win Toplevel path C Users HP Desktop dataset for
  • TemplateSyntaxError:“settings_tags”不是有效的标签库

    当我尝试运行此测试用例时 出现此错误 这是在我的 django 应用程序的tests py 中编写的 def test accounts register self self url http royalflag com pk accoun
  • 如何使用 Pycharm 运行 fast-api 服务器?

    我有一个简单的 API 函数 如下所示 from fastapi import FastAPI app FastAPI app get async def read root return Hello World 我正在使用启动服务器uvi
  • 让 python 脚本打印到终端而不作为标准输出的一部分返回

    我正在尝试编写一个返回值的 python 脚本 然后我可以将其传递给 bash 脚本 问题是我想要在 bash 中返回一个单一值 但我想要一些东西一路打印到终端 这是一个示例脚本 我们称之为 return5 py usr bin env p
  • 使用 Poetry 创建的 Python 项目:如何在 Visual Studio Code 中调试它?

    我有一个根据基本 Poetry 创建的 Python 项目指示 https python poetry org docs basic usage 项目文件夹是这样的 my project my project my project py F
  • 在Python中,如何通过去掉括号和大括号来打印Json

    我想以一种很好的方式打印 Json 我想去掉方括号 引号和大括号 只使用缩进和行尾来显示 json 的结构 例如 如果我有一个像这样的 Json A A1 1 A2 2 B B1 B11 B111 1 B112 2 B12 B121 1
  • 按升序对数字字符串列表进行排序

    我创建了一个SQLite https en wikipedia org wiki SQLite数据库有一个存储温度值的表 第一次将温度值按升序写入数据库 然后 我将数据库中的温度值读入列表中 然后将该列表添加到组合框中以选择温度 效果很好
  • 如何在 tkinter 后台运行函数[重复]

    这个问题在这里已经有答案了 我是 GUI 编程新手 我想用 tkinter 编写一个 Python 程序 我想要它做的就是在后台运行一个可以通过 GUI 影响的简单函数 该函数从 0 计数到无穷大 直到按下按钮为止 至少这是我想要它做的 但
  • 在基本 Tensorflow 2.0 中运行简单回归

    我正在学习 Tensorflow 2 0 我认为在 Tensorflow 中实现最基本的简单线性回归是一个好主意 不幸的是 我遇到了几个问题 我想知道这里是否有人可以提供帮助 考虑以下设置 import tensorflow as tf 2
  • PySpark DataFrame 上分组数据的 Pandas 式转换

    如果我们有一个由一列类别和一列值组成的 Pandas 数据框 我们可以通过执行以下操作来删除每个类别中的平均值 df DemeanedValues df groupby Category Values transform lambda g
  • 数据类和属性装饰器

    我一直在阅读 Python 3 7 的数据类 作为命名元组的替代品 我通常在必须将数据分组到结构中时使用它 我想知道数据类是否与属性装饰器兼容 以便为数据类的数据元素定义 getter 和 setter 函数 如果是这样 是否在某处进行了描
  • 从函数在 python 3 中创建全局变量

    我想知道为什么在函数结束后我无法访问变量 variable for raw data 代码是这样的 def htmlfrom Website URL import urllib request response urllib request
  • python 中“重载”函数的最佳方法? [复制]

    这个问题在这里已经有答案了 我正在尝试在 python 中做这样的事情 def foo x y do something at position x y def foo pos foo pos x pos y 所以我想根据我提供的参数数量调
  • 将整数转换为特定格式的十六进制字符串

    我是 python 新手 有以下问题 我需要将整数转换为 6 个字节的十六进制字符串 例如 281473900746245 gt xFF xFF xBF xDE x16 x05 十六进制字符串的格式很重要 int 值的长度是可变的 格式 0
  • 无法将 librosa 与 python 3 一起使用

    我已经在 Windows 上的 ubuntu 子系统上使用 pip3 正确安装了 librosa 但是当我尝试执行像这样的简单程序时 import librosa data sr librosa load sound mp3 print d
  • Python 中的可逆 STFT 和 ISTFT

    有没有通用的形式短时傅立叶变换 https en wikipedia org wiki Short time Fourier transform与内置于 SciPy 或 NumPy 或其他什么中的相应逆变换 这是pyplotspecgram

随机推荐

  • 波场TRON将致力于推动各方合作打击恐怖主义融资

    随着加密行业的蓬勃发展 新的挑战也接踵而至 近期 有外媒报道称 哈马斯等美国认定的国际恐怖组织涉通过波场TRON进行融资活动 在这场风波中 区块链项目波场TRON似乎成为了质疑的焦点 然而 当我们深入了解事实真相时 或许会发现事情并非传言中
  • 获取员工其当前的薪水比其manager当前薪水还高的相关信息

    11月29日 财报大部分数据落在预期内甚至小超预期 11月30日 股价埋头下跌 这是最近评论区争论最厉害的中概股之一 美团 给下跌找理由和给上涨编故事的本质都是 首先 冒泡排序是什么 冒泡排序 Bubble Sort 是一种简单的排序算法
  • 谈谈兼容性测试

    兼容性测试是一种测试软件或网站在不同的环境下是否能够正常运行和显示的测试方法 主要目的是保证软件的功能 性能和用户体验在各种条件下都达到预期的标准 兼容性测试的范围包括以下几个方面 浏览器兼容性 测试软件或网站在不同的浏览器 如Chrome
  • 界面组件DevExpress Reporting v23.1新版亮点 - UX功能增强

    DevExpress Reporting gt https www evget com product 3373 是 NET Framework下功能完善的报表平台 它附带了易于使用的Visual Studio报表设计器和丰富的报表控件集
  • 扬帆证券:趋势线是画最低点还是收盘价?

    趋势线是股票分析中最底子的技术指标之一 趋势线是一种可帮忙股票生意者辨认价格趋势的图形方法 趋势线是可以经过联接恣意两个价格点画出的一条直线 但是 在画出趋势线时 一个常见的问题是 运用最低点还是收盘价来画趋势线 在这篇文章中 我们将从多个
  • 【计算机毕业设计】垃圾分类回收系统

    垃圾分类回收系统 如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统垃圾分类回收系统信息管理难度大 容错率低 管理人员处理数据费工费时
  • 扬帆证券:A股股息率逼近历史新高 价值股迎配置良机

    A股公司酬谢股东积极性持续进步 本年前三季度多达243家公司现金分红 一切核算数据只包括各个报告期分红 不包括特别分红 派现公司数量及占比均创出10年来同期新高 估计分红近2300亿元 分红率靠近33 分红额及分红率均为近10年来同期次高
  • 【计算机毕业设计】民航网上订票系统

    民航网上订票系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装民航网上订票系统软件来发挥其高效地信息处理的作用 可以规范信息管理流程 让管理
  • SQL 数据操作技巧:SELECT INTO、INSERT INTO SELECT 和 CASE 语句详解

    SQL SELECT INTO 语句 SELECT INTO 语句将数据从一个表复制到一个新表中 SELECT INTO 语法 将所有列复制到新表中 SELECT INTO newtable IN externaldb FROM oldta
  • 双馈风机虚拟惯性控制+下垂控制参与系统一次调频的Simulink模型

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Simulink仿真实现
  • 邻接表表示图进行深度优先搜索,广度优先搜索,最小生成树

    图的邻接表定义 下面用邻接表实现图的深度优先搜索和广度优先搜索 用邻接矩阵来实现最小生成树 图的邻接表 首先定义一个图的邻接表的类 里面包括图的顶点数 图的边数 顶点表数组 由于顶点表数组里存放的都是图的一个个节点 因此需要用到顶点节点和边
  • 【计算机毕业设计】宠物猫认养系统

    宠物猫认养系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装宠物猫认养系统软件来发挥其高效地信息处理的作用 可以规范信息管理流程 让管理工作
  • 扬帆证券:什么是证券服务机构?

    股票市场上 除出资者之外有各式各样的生意主体 盘绕证券所打开的公司类型非常丰盛 在实践生意中 常与出资者有所联络的不止证券公司 还有证券服务组织 那什么是证券服务组织 它和证券公司之间有什么关系 关于这些 本文将借用相关常识作部分评论 来为
  • 基于GWO-BP灰狼算法优化BP神经网络时序回归预测研究(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码 数据 讲解文档 1 概述 基于GWO
  • 【计算机毕业设计】红色革命文物征集管理系统

    红色革命文物征集管理系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装红色革命文物征集管理系统软件来发挥其高效地信息处理的作用 可以规范信息
  • 【计算机毕业设计】私房菜定制上门服务系统

    私房菜定制上门服务系统 如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统私房菜定制上门服务系统信息管理难度大 容错率低 管理人员处理
  • 光伏储能单相逆变器并网仿真模型(Simulink仿真实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Simulink仿真实现
  • 【计算机毕业设计】视频点播系统

    视频点播系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装视频点播系统软件来发挥其高效地信息处理的作用 可以规范信息管理流程 让管理工作可以
  • 【计算机毕业设计】可信捐赠系统

    可信捐赠系统 如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统可信捐赠系统信息管理难度大 容错率低 管理人员处理数据费工费时 所以专
  • 一文讲透Python线程池ThreadPoolExecutor

    01 初识 Python 中已经有了 threading 模块 为什么还需要线程池呢 线程池又是什么东西呢 在介绍线程同步的信号量机制的时候 举得例子是爬虫的例子 需要控制同时爬取的线程数 例子中创建了20个线程 而同时只允许3个线程在运行