Python3 ThreadPoolExecutor--线程池

2023-10-27

1、线程池创建

    def __init__(self, 
        max_workers=None, 
        thread_name_prefix='', 
        initializer=None, 
        initargs=())

max_workers: 设置线程池中最多能同时运行的线程数目

thread_name_prefix:一个可选的名称前缀,用于给线程命名

initializer : 线程可回调的函数

initargs: initializer 的参数

2、submit

提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄

def submit(self, fn, /, *args, **kwargs):

即将fn函数提交给线程池,*args代表传给fn函数的参数,**kwargs代表以关键字参数的形式为fn函数传入参数。

    def submit(self, fn, /, *args, **kwargs):
        with self._shutdown_lock, _global_shutdown_lock:
            if self._broken:
                raise BrokenThreadPool(self._broken)

            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')
            if _shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
submit 函数返回的为 future 对象 ,future 各个方法如下:
class Future(Generic[_T]):
    # 初始化
    def __init__(self) -> None: ...

    # Returns True if the future was cancelled, False otherwise. A future
        cannot be cancelled if it is running or has already completed.
    def cancel(self) -> bool: ...

    # Return True if the future was cancelled.
    def cancelled(self) -> bool: ...

    # Return True if the future is currently executing
    def running(self) -> bool: ...

    # Return True of the future was cancelled or finished executing
    def done(self) -> bool: ...

    #"""Attaches a callable that will be called when the future finishes.

        Args:
            fn: A callable that will be called with this future as its only
                argument when the future completes or is cancelled. The callable
                will always be called by a thread in the same process in which
                it was added. If the future has already completed or been
                cancelled then the callable will be called immediately. These
                callables are called in the order that they were added.
     """
    def add_done_callback(self, fn: Callable[[Future[_T]], Any]) -> None: ...

    #"""Return the result of the call that the future represents.

        Args:
            timeout: The number of seconds to wait for the result if the future
                isn't done. If None, then there is no limit on the wait time.

        Returns:
            The result of the call that the future represents.

        Raises:
            CancelledError: If the future was cancelled.
            TimeoutError: If the future didn't finish executing before the given
                timeout.
            Exception: If the call raised then that exception will be raised.
        阻塞式的获取任务的返回值
    """
    def result(self, timeout: float | None = ...) -> _T: ...

    #"""Mark the future as running or process any cancel notifications.

        Should only be used by Executor implementations and unit tests.

        If the future has been cancelled (cancel() was called and returned
        True) then any threads waiting on the future completing (though calls
        to as_completed() or wait()) are notified and False is returned.

        If the future was not cancelled then it is put in the running state
        (future calls to running() will return True) and True is returned.

        This method should be called by Executor implementations before
        executing the work associated with this future. If this method returns
        False then the work should not be executed.

        Returns:
            False if the Future was cancelled, True otherwise.

        Raises:
            RuntimeError: if this method was already called or if set_result()
                or set_exception() was called.
     """
    def set_running_or_notify_cancel(self) -> bool: ...

    #"""Sets the return value of work associated with the future.

        Should only be used by Executor implementations and unit tests.
    """
    def set_result(self, result: _T) -> None: ...

    #"""Return the exception raised by the call that the future represents.

        Args:
            timeout: The number of seconds to wait for the exception if the
                future isn't done. If None, then there is no limit on the wait
                time.

        Returns:
            The exception raised by the call that the future represents or None
            if the call completed without raising.

        Raises:
            CancelledError: If the future was cancelled.
            TimeoutError: If the future didn't finish executing before the given
                timeout.
    """
    def exception(self, timeout: float | None = ...) -> BaseException | None: ...

    #"""Return the exception raised by the call that the future represents.

        Args:
            timeout: The number of seconds to wait for the exception if the
                future isn't done. If None, then there is no limit on the wait
                time.

        Returns:
            The exception raised by the call that the future represents or None
            if the call completed without raising.

        Raises:
            CancelledError: If the future was cancelled.
            TimeoutError: If the future didn't finish executing before the given
                timeout.
    """
    def set_exception(self, exception: BaseException | None) -> None: ...

3、as_completed

一次取出所有任务的结果,详情请看 as_completed 源码,tasks 为 submit 返回的 future 列表

from concurrent.futures import ThreadPoolExecutor, as_completed
 
for future in as_completed(tasks):
    data = future.result()


4、map

另一种获取线程池返回结果的方法

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a child process. This argument is only
                used by ProcessPoolExecutor; it is ignored by
                ThreadPoolExecutor.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """

5、wait

方法可以让主线程阻塞,直到满足设定的要求。tasks 为 submit 返回的 future 列表

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED

wait(tasks, return_when=ALL_COMPLETED)

def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    """Wait for the futures in the given sequence to complete.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            wait upon.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        return_when: Indicates when this function should return. The options
            are:

            FIRST_COMPLETED - Return when any future finishes or is
                              cancelled.
            FIRST_EXCEPTION - Return when any future finishes by raising an
                              exception. If no future raises an exception
                              then it is equivalent to ALL_COMPLETED.
            ALL_COMPLETED -   Return when all futures finish or are cancelled.

    Returns:
        A named 2-tuple of sets. The first set, named 'done', contains the
        futures that completed (is finished or cancelled) before the wait
        completed. The second set, named 'not_done', contains uncompleted
        futures. Duplicate futures given to *fs* are removed and will be 
        returned only once.
    """

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python3 ThreadPoolExecutor--线程池 的相关文章

  • 在PyGI中获取窗口句柄

    在我的程序中 我使用 PyGObject PyGI 和 GStreamer 在 GUI 中显示视频 该视频显示在Gtk DrawingArea因此我需要获取它的窗口句柄realize 信号处理程序 在 Linux 上 我使用以下方法获取该句
  • Tkinter 菜单删除项

    如何删除任何菜单项 例如我想删除 播放 self menubar Menu self root self root config menu self menubar self filemenu2 Menu self menubar self
  • 如何使用 python 的 http.client 准确读取一个响应块?

    Using http client在 Python 3 3 或任何其他内置 python HTTP 客户端库 中 如何一次读取一个分块 HTTP 响应一个 HTTP 块 我正在扩展现有的测试装置 使用 python 编写 http clie
  • Python re无限执行

    我正在尝试执行这段代码 import re pattern r w w s re compiled re compile pattern results re compiled search COPRO HORIZON 2000 HOR p
  • 从 pyspark.sql 中的列表创建数据框

    我完全陷入了有线的境地 现在我有一个清单li li example data map lambda x get labeled prediction w x collect print li type li 输出就像 0 0 59 0 0
  • 为什么第二个 request.session cookies 返回空?

    我想使用 requests Session post 登录网站 但是当我已经登录主页 然后进入帐户页面时 看来cookies还没有保存 因为cookies是空的 而且我无法进入正确的帐户页面 import requests from bs4
  • python celery -A 的无效值无法加载应用程序

    我有一个以下项目目录 azima init py main py tasks py task py from main import app app task def add x y return x y app task def mul
  • 如何使用 opencv python 计算乐高积木上的孔数?

    我正在开发我的 python 项目 我需要计算每个乐高积木组件中有多少个孔 我将从输入 json 文件中获取有关需要计算哪个程序集的信息 如下所示 img 001 red 0 blue 2 white 1 grey 1 yellow 1 r
  • 在Python中读取tiff标签

    我正在尝试用 Python 读取 tiff 文件的标签 该文件是 RGB 的uint16每个通道的值 我目前正在使用tifffile import tifffile img tifffile imread file tif 然而 img是一
  • 在 Mac OSX 上从 Python 3.6 运行 wine 命令

    我正在尝试用 Python 编写一个打开的脚本wine然后发送代码到wine终端打开一个 exe程序 这 exe程序也是命令驱动的 我可以打开wine 但我无法进一步 import shlex subprocess line usr bin
  • 在 Mac OS X 上安装 libxml2 时出现问题

    我正在尝试在我的 Mac 操作系统 10 6 4 上安装 libxml2 我实际上正在尝试在 Python 中运行 Scrapy 脚本 这需要我安装 Twisted Zope 现在还需要安装 libxml2 我已经下载了最新版本 2 7 7
  • Jupyter 笔记本中未显示绘图图表

    我已经尝试解决这个问题几个小时了 我按照上面的步骤操作情节网站 https plot ly python getting started start plotting online并且图表仍然没有显示在笔记本中 这是我的情节代码 color
  • NumPy 相当于 Keras 函数 utils.to_categorical

    我有一个使用 Keras 进行机器学习的 Python 脚本 我正在构建 X 和 Y 它们分别是特征和标签 标签的构建方式如下 def main depth 10 nclass 101 skip True output True video
  • 使用seaborn绘制简单线图

    我正在尝试使用seaborn python 绘制ROC曲线 对于 matplotlib 我只需使用该函数plot plt plot one minus specificity sensitivity bs where one minus s
  • pygame:使用 sprite.RenderPlain 绘制精灵组的顺序

    我有一个精灵组 需要按一定的顺序绘制 以便其精灵按应有的方式重叠 然而 即使使用运算符模块函数 sorted self sprites key attrgetter y x 对组进行排序 顺序也是错误的 我该如何解决这个问题 直截了当地说
  • Discord.py 嵌入中禁用按钮/冻结按钮

    I m trying to make a replica of this bot in which when I press any of the buttons below it shows a dropdown menu and you
  • Python守护进程:保持日志记录

    我有一个将一些数据记录到磁盘的脚本 logging basicConfig filename davis debug log level logging DEBUG logging basicConfig filename davis er
  • 在matlab中,如何读取python pickle文件?

    在 python 中 我生成了一个 p 数据文件 pickle dump allData open myallData p wb 现在我想在Matlab中读取myallData p 我的Matlab安装在Windows 8下 其中没有Pyt
  • Jupyter Notebook:带有小部件的交互式绘图

    我正在尝试生成一个依赖于小部件的交互式绘图 我遇到的问题是 当我使用滑块更改参数时 会在前一个绘图之后完成一个新绘图 而我预计只有一个绘图会根据参数发生变化 Example from ipywidgets import interact i
  • 描述符“join”需要“unicode”对象,但收到“str”

    代码改编自here http wiki geany org howtos convert camelcase from foo bar to Foo Bar def lower case underscore to camel case s

随机推荐

  • 机器学习:Logistic回归介绍

    Logistic回归定义 简单来说 逻辑回归 Logistic Regression 是一种用于解决二分类 0 or 1 问题的机器学习方法 用于估计某种事物的可能性 比如某用户购买某商品的可能性 某病人患有某种疾病的可能性 以及某广告被用
  • 【转载】Java Instrument 功能使用及原理

    0 介绍 利用 java lang instrument 做动态 Instrumentation 是 Java SE 5 的新特性 它把 Java 的 instrument 功能从本地代码中解放出来 使之可以用 Java 代码的方式解决问题
  • elasticsearch FunctionScore Java API

    elasticsearch FunctionScore java API 1 使用script FunctionScoreQueryBuilder query QueryBuilders functionScoreQuery queryBu
  • 修改微信小程序官方picker-view日期选择器

    微信小程序问题之picker view日期选择器 项目需求截图 选择出生日期的时候需要用到微信的picker view日期选择器 然后就发现官方picker view组件有一些问题 1 初始化日期不是当前日期 2 选择不同月份 日期都是从1
  • NVIDIA Video Codec SDK简介

    NVIDIA的Video Codec SDK提供API对视频进行加速编解码 最新发布版本为12 0 支持Windows和Linux平台 可从 https developer nvidia com video codec sdk archiv
  • python 操作 doc /docx

    对于python来说操作 doc 需要用到 win32com 安装 pip install win32com 优点 doc所有的操作都可以执行 缺点 如果没有office就死翘翘了 当然也可以com wsp 对于这种需要强制安装xx的不是很
  • Redis系列(一)与同类对比,底层数据结构,阿里云简单部署

    Redis与Memcache Ehcache对比 有持久化需求或者对数据结构和处理有高级要求的应用 选择redis 其他简单的key value存储 选择memcache Memcache适合多读少写 大数据量的情况 如人人网大量查询用户信
  • Docker搭建rtmp视频直播

    一 PC端搭建RTMP服务器 1 安装docker sudo apt get install docker sudo apt get update 2 下载docker nginx rtmp容器 sudo apt get install d
  • html中如何获取表单的数据

    1 使用表单中的id属性获取表单中的数据 在表单的input标签中指定id属性值 在script脚本中可以通过 表格的名称 id名称 value 来访问表单中的值 例如 register form username value 表单代码
  • 整理最新java面试宝典2019

    java面试宝典2019 参照 http www wityx com 3 html 1 meta标签的作用是什么 2 ReenTrantLock可重入锁 和synchronized的区别 总结 3 Spring中的自动装配有哪些限制 4 什
  • 用U盘安装Linux系统

    需要的东西 1 软件 Universal USB Installer 2 U盘 容量至少为4GB 3 linux系统的镜像文件 比如 ubuntu 10 04 3 desktop i386 iso或者ubuntu 10 04 desktop
  • 在排序数组中查找元素的第一个和最后一个位置

    本文就来探究几个最常用的二分查找场景 寻找一个数 寻找左侧边界 寻找右侧边界 而且 我们就是要深入细节 比如不等号是否应该带等号 mid 是否应该加一等等 分析这些细节的差异以及出现这些差异的原因 保证你能灵活准确地写出正确的二分查找算法
  • Alibaba 神器!一招定位 线 上Bug

    尊重原创版权 https www csnovel com hot 43764 html 更多内容参考 https www csnovel com 阿里神器 一招定位 线 上Bug 背景 公司有个渠道系统 专门对接三方渠道使用 没有什么业务逻
  • windows计算机锁屏的快捷键是什么,win10电脑锁屏快捷键是什么

    我们在使用win10操作系统的时候 其中有很多快捷方式我们都是可以直接使用键盘操作而省去鼠标键盘配合操作的 这样做的优点就是会非常的有效率 节省时间而且没有繁琐的步骤 有的小伙伴想知道我们win10的锁屏快捷键是什么 那么现在就让小编来告诉
  • 杰理之蓝牙OTA蓝牙升级【篇】

    命令AT OTA r响应成功 r nOK r n失败 r nERR data r n
  • gitee码云的使用 ----- 将项目上传

    准备工作 首先你得安装好git 安装教程如下 https git scm com downloads 去码云官网注册登录 然后创建仓库按照步骤做完会得到一个https地址 然后下面就会用到 接下来就是如何将自己的项目代码放置到gitee上
  • 记录C++类中的一次函数调用

    引用 之前遇到一次函数调用结果的问题 今天在这里做一下记录 一个基类 一个派生类 两个类中都有一个函数名相同 参数相同 参数不同的函数 创建基类指针指向派生类对象 调用相应的函数 派生类指针指向基类对象 调用相应的函数 求其输出结果 具体看
  • 基于STM32使用超声波HC-SR04模块

    写在前面注意的几点 1 HC SR04模块必须使用5V供电 不能是3 3V 2 若单是测距 无需使用中断 3 Echo和Trig两个引脚可以任意接可用的GPIO 和定时器无关 说一下超声波的工作原理 单片机给Trig引脚一个最少10us的高
  • python matplotlib 坐标轴打断

    想要绘制出如下类型坐标轴断开的图 matplotlib中并没有直接可用的API 但是官方给出了一个demo broken axis py 大概说下思路 画出两个共享X轴 完全相同的图 下图取消上边界 下图取消下边界 然后再使用plot画两组
  • Python3 ThreadPoolExecutor--线程池

    1 线程池创建 def init self max workers None thread name prefix initializer None initargs max workers 设置线程池中最多能同时运行的线程数目 threa