Python 并发系列 2 —— 各种并发方案的选择

2023-11-17

二、异步 Python:不同形式的并发

翻译自:Async Python: The Different Forms of Concurrency

随着 Python 3 的出现,我们听到了很多关于“异步(async)”和“并发(concurrency)”的讨论,人们可能会简单地假设 Python 最近才引入了这些概念/功能。但这显然不是,我们已经使用异步和并发操作很多次了。许多初学者可能认为 asyncio 是进行异步/并发操作的唯一/最佳方法。在本文中,我们将探讨实现并发性的不同方法以及它们的优缺点。

2.1 术语定义

在我们深入研究技术方面之前,有必要对这个上下文中经常使用的术语有一些基本的理解。

同步(Sync) vs 异步(Async)

在同步操作中,任务一个接一个地同步执行。在异步操作中,任务可以独立地启动和完成。当执行转移到新任务时,一个异步任务可以启动并继续运行。异步任务不阻塞(使执行等待其完成)操作,通常在后台运行。

例如,当你需要打电话给旅行社预订你的下一个假期。在你去旅游之前,你需要给你的老板发一封电子邮件。以同步的方式,你会先打电话给旅行社,如果他们让你稍等片刻,你就会一直等着。完成后,你就开始给老板写邮件。在这里你完成了一个又一个的任务。但如果你很聪明,当你等待的时候,你可以开始写邮件,当他们和你谈话时,你暂停写邮件,和他们谈话,然后继续写邮件。你也可以让朋友在你写完邮件的时候打电话。这是异步性。任务不会相互阻塞。

并发(Concurrency) vs 并行(Parallelism)

并发意味着两个任务一起取得进展。在前面的例子中,当我们考虑异步示例时,我们在与旅行社的通话和编写电子邮件方面都取得了进展。这是并发。但当我们谈到从一个朋友那里获得帮助时,在这种情况下,两个任务将并行运行。并行实际上是并发的一种形式。但并行性依赖于硬件。例如,如果 CPU 中只有一个内核,两个操作就不能真正并行运行。他们只是分享同一个核心的时间片。这是并发性,但不是并行性。但当我们有多个核心时,我们实际上可以并行运行两个或更多操作(取决于核心的数量)。

快速小结

  • 同步:阻塞操作。
  • 异步:非阻塞操作。
  • 并发:一个人同时干多件事情。
  • 并行:多个人一起干多件事情。

并行意味着并发,但并发并不总是并行的。

2.2 线程(Threads)& 进程(Processes)

线程允许我们并发运行,但由于 GIL 的存在,线程没有提供并行性。然而,Python 下的多进程可以利用多核绕开 GIL。

Threads

worker 函数将以多线程、异步、并发的形式执行

import threading
import time
import random

def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.setDaemon(False)   # 如果设置为 True,当主进程结束时,不管子线程有没有完成都会被迫中止
    t.start()
    # t.join()   # 是否阻塞

print("All Threads are queued, let's see when they finish!")
# 加入 join 阻塞
# 线程由于阻塞是线性,一个接一个的运行,实际共耗时 1+4+7+7+8
$ python thread_test.py
I am Worker 0, I slept for 1 seconds
I am Worker 1, I slept for 4 seconds
I am Worker 2, I slept for 7 seconds
I am Worker 3, I slept for 7 seconds
I am Worker 4, I slept for 8 seconds
All Threads are queued, let's see when they finish!

# 去掉 t.join(),则主进程不等,所以先输出下面打印信息
All Threads are queued, let's see when they finish!
I am Worker 3, I slept for 1 seconds
I am Worker 4, I slept for 2 seconds
I am Worker 1, I slept for 4 seconds
I am Worker 2, I slept for 4 seconds
I am Worker 0, I slept for 9 seconds

# 将 setDaemon 设置为 True,主进程输出下面打印信息就结束了,子线程还没来得及开始就被迫中止
All Threads are queued, let's see when they finish!

因此,你可以看到我们启动了5个线程,它们一起取得进展,当我们启动线程(从而执行 worker 函数)时,操作不会等待线程完成,然后再转到下一个 print 语句。所以这是一个异步操作。

进一步阅读: https://pymotw.com/3/threading/index.html

Global Interpreter Lock (GIL)

引入全局解释器锁 GIL 是为了使 CPython 的内存管理更容易,并允许更好地与 C 集成(例如扩展)。GIL 是一种锁机制,Python 解释器一次只允许运行一个线程。在任何给定的时间点只允许有一个线程可以执行 Python 字节码。这个 GIL 确保多个线程不会并行运行。

关于GIL的简要事实:

  • 一次只能运行一个线程。
  • Python 解释器在线程之间切换以允许并发。
  • GIL 只适用于CPython(事实上的实现)。其他实现,如 Jython、IronPython 没有 GIL。
  • GIL 使单线程程序快速。
  • 对于 I/O 密集型操作,GIL 通常不会造成太大危害。
  • GIL 使集成非线程安全的 C 库变得容易,因此我们有许多用 C 编写的高性能扩展/模块。
  • 对于 CPU 密集型的任务,解释器在 N 个 ticks 和 switches 线程之间进行检查。所以一个线程不会阻塞其他线程。

许多人认为 GIL 是一种弱点。我认为这是一种幸事,因为它使得像 NumPy、SciPy 这样的库成为可能,它们使Python 在科学界占据了独特的地位。

进一步阅读:http://www.dabeaz.com/python/UnderstandingGIL.pdf

Processes

使用 multiprocessing 稍微修改一下代码:

import multiprocessing
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = multiprocessing.Process(target=worker, args=(i,))
    t.start()

print("All Processes are queued, let's see when they finish!")

multiprocessing 替代 threading 模块,用 Process 方法替代 Thread 方法,现在便可以充分利用 CPU 的多核。借助 Pool 类,我们可以向同一个函数传递不同的参数进行计算,例如:

from multiprocessing import Pool
import time


def f(x):
    return x ** 2


if __name__ == '__main__':
    a = list(range(100000000))
    start = time.time()
    p = Pool(10)
    d = p.map(f, a)
    print(time.time() - start)

    start = time.time()
    b = [x ** 2 for x in a]
    print(time.time() - start)

    start = time.time()
    c = (x ** 2 for x in a)
    print(time.time() - start)

# 7.4659998416900635
# 20.971519947052002
# 3.0994415283203125e-06

在这里,我们实际上是在不同的进程上运行该函数,而不是遍历值列表并逐个调用 f。一个进程执行 f(1),另一个运行 f(2),另一个运行 f(3)。最后,结果再次聚合到一个列表中。这将使我们能够将繁重的计算分解成更小的部分,并并行运行它们以加快计算速度。(译者注:本机实测,元组生成式最快)

from multiprocessing import Pool
import time

def f(x):
    return x ** 4

num = 100000000

start_time = time.time()
p = Pool(4)
result = p.map(f, list(range(num)))
print(time.time() - start_time)
print(len(result), result[9])
# 48.95770812034607
# 100000000 6561

start_time = time.time()
result = [x**4 for x in range(num)]
print(time.time() - start_time)
print(len(result), result[9])
# 37.78309106826782
# 100000000 6561

# num = 10^7
5.439199209213257
10000000 6561
3.675222158432007
10000000 6561

# num = 10^4
0.01634526252746582
10000 6561
0.003140687942504883
10000 6561

进一步阅读: https://pymotw.com/3/multiprocessing/index.html

concurrent.futures 模块

The concurrent.futures module packs some really great stuff for writing async codes easily. My favorites are the ThreadPoolExecutor and the ProcessPoolExecutor. These executors maintain a pool of threads or processes. We submit our tasks to the pool and it runs the tasks in available thread/process. A Future object is returned which we can use to query and get the result when the task has completed.

Here’s an example of ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

I have a blog post on the concurrent.futures module here: http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html which might be helpful for exploring the module deeper.

Further Reading: https://pymotw.com/3/concurrent.futures/

2.3、Asyncio - Why、What、How?

你可能有很多 Python 社区的人都会问的一个问题:asyncio带来了什么新特性?为什么我们需要多种异步 I/O 的方法?我们还接着用线程和进程吗?

为什么我们需要 asyncio?

进程的过程成本高昂。所以对于 I/O,基本会优先选择线程。我们知道,I/O 依赖于外部设备—缓慢的磁盘或糟糕的网络延迟,这通常会使 I/O 变得不可预测。现在,假设我们使用线程进行 I/O 密集型操作。3 个线程正在执行不同的 I/O 任务。解释器需要在并发线程之间切换,并给每个线程轮流一些时间。假设有 3 个线程 T1T2T3。这三个线程已开始其 I/O 操作。T3 先完成。T2T1 仍在等待 I/O。Python 解释器切换到 T1,但它仍在等待,好吧,然后它移到 T2,它也在等待,然后移到 T3T3 已经准备好并开始执行代码。你看到这里的问题了吗?

T3 已经准备好了,但是解释器先在 T2T1 之间切换,这就产生了切换成本,如果解释器先转到 T3,我们就可以避免这种问题,对吧?

什么是 asyncio?

Asyncio 为我们提供了一个事件循环(event loop)和其他好东西。事件循环跟踪不同的 I/O 事件,并切换到已准备好的任务,并暂停正在等待 I/O 的任务。因此,我们不会在尚未准备好立即运行的任务上浪费时间。

这个想法很简单。有一个事件循环。我们有运行异步 I/O 操作的函数。我们将函数赋给事件循环,并要求它为我们运行这些函数。事件循环给我们一个 Future 对象,就像一个承诺,我们将在未来得到一些东西。我们拿到这个承诺,一次又一次地检查它是否有值(当我们感到不耐烦的时候),最后当 future 有值的时候,我们在其他一些操作中使用它。

Asyncio 使用生成器和协程来暂停和恢复任务。你可以阅读以下文章了解更多详细信息:

  • http://masnun.com/2015/11/20/python-asyncio-future-task-and-the-event-loop.html
  • http://masnun.com/2015/11/13/python-generators-coroutines-native-coroutines-and-async-await.html
我们应该怎么使用 asyncio?

我们先看看下面这个例子

import asyncio
import datetime
import random


async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()
		loop.stop()    # 原文缺失,不加这句话,loop 永远在运行,程序永远不结束  

loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

注意 async/await 语法只在 python 3.5+ 版本中,如果我们浏览代码会发现:

  • 我们有一个异步函数 display_date ,它接收一个数字(一个标志号)及事件循环作为参数
  • 这个函数有一个死循环,在 50s 后退出循环,在这 50s 内,它不断的输出当前时间及休眠一会儿。await 函数可以等待其他异步函数(协程)先去完成。
  • 我们把函数传递到事件循环中(使用 ensure_future 方法)
  • 开始运行事件循环

无论何时进行 await 调用,asyncio 都知道函数可能需要一些时间。因此,它暂停执行,开始监视与其相关的任何 I/O 事件,并允许任务运行。当 asyncio 注意到暂停的函数的 I/O 已就绪时,它将恢复该函数。

2.4、如何做出正确的选择

我们已经介绍了进程、线程、协程等多种并发方法,还是那个问题,我们应该选择哪一个呢?这取决于使用情景,根据使用经验以及相关资料推荐,我整理了下面这份伪代码:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
       print("Use Threads")
else:
    print("Multi Processing")
  • CPU 密集型 => 使用多进程(我擦,有时候多进程不一定干的过单线程,比如上面的示例,小任务量还是别用了,耗资源太狠了而且进程管理成本还很高)
  • I/O 密集型, I/O 很快 => 多线程
  • I/O 密集型, I/O 很慢, => Asyncio (协程)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 并发系列 2 —— 各种并发方案的选择 的相关文章

随机推荐

  • 我的 Android 求职简历

    昨晚在Diycode的微信交流群里面 有位在校的童鞋想要找一份开发的实习工作 他把简历做好后在群上共享了一份 我看到后便下载了一份 看了看简历内容 我在想如果我是招聘单位看简历的 这份简历可以说是基本没戏的 因为内容基本和开发没有多大关系
  • Qt5Error:msvc-version.conf loaded but QMAKE_MSC_VER ins‘t set

    错误描述 msvc version conf loaded but QMAKE MSC VER ins t set 解决方案 这种情况一般都是修改了项目的配置之后出现的 也就是 qmake stash文件出错 删除这个文件再进行重新编译即可
  • 基于python的布尔盲注爆破脚本(sqli-libs第八关)

    写这个脚本的原因是因为布尔爆破步骤的繁琐 因此写下这个半自动化脚本来提升效率 只需输入url和标志词便可开始爆破 下面结合sqli libs第八关来详细说明 这一关是布尔盲注 布尔盲注用于页面没有回显的情况下 但是心细的同学会发现当我们注入
  • 使用Microsoft.Reporting. WebForms中ReportViewer报表查看器(Server方式)

    添加Web Reference http
  • Flex程序编译

    Makefile三要素 目标 依赖 命令 详解可见makefile 编写 周北 CSDN博客 makefile 编写 Makefile中常用函数和自动化变量 wildcard 扩展通配符 例 OBJECTS wildcard o 该找到目标
  • C++构造函数中不调用虚函数的原因

    今天在看网上一篇帖子的时候看到这个问题 试讲关于C 对象虚函数表和类型信息的 RTTI 的 正好看到了如下内容 这个是为什么在构造函数中不能调用虚函数的原因 因为任何时候在基类中的虚函数调用 都不可能到达子类的实现 因为子类的虚表初始化是在
  • [docker]搭建elasticsearch服务

    1 拉取镜像 docker pull elasticsearch 8 7 0 如果需要其他版本的话 访问 Dockerhttps hub docker com ela
  • QT从入门到入土(四)——文件的读写操作

    引言 文件的读写是很多应用程序具有的功能 甚至某些应用程序就是围绕着某一种格式文件的处理而开发的 所以文件读写是应用程序开发的一个基本功能 Qt 提供了两种读写纯文本文件的基本方法 用 QFile 类的 IODevice 读写功能直接进行读
  • uniapp中的分享功能实现(APP,小程序,公众号)

    uniapp中的分享功能实现 APP 小程序 公众号 1 APP端的分享 app端的分享可以直接使用uniapp封装的方法uni share uni app的App引擎已经封装了微信 QQ 微博的分享SDK 开发者可以直接调用相关功能 可以
  • Android交叉编译OpenCV+FFmpeg+x264的艰难历程

    前言 如果你没有兴趣看完本文 只想获得可编译的代码或编译后的产物 可以直接点击下面的链接 跟随步骤编译代码或直接下载我编译好的产物 注 编译顺序要按照 x264 gt FFmpeg gt OpenCV 这样来 x264 FFmpeg Ope
  • 使用less处理重复性background-image定位问题

    1 问题描述 使用angular的列表循环解释问题 其他框架类似 css上面使用flex布局 index html代码 div class container div class item div class div p class ite
  • CH3-HarmonyOS开发基础

    文章目录 背景 目标 一 APP 1 1 APP包组成 1 2 APP和HAP结构 二 Ability 2 1 Ability 2 2 pack info 三 libs库文件 3 1 HAR 四 resources资源文件 4 1 reso
  • shiro框架---关于用户登录退出接口的介绍

    接上一篇文章shiro框架 shiro配置用户名和密码的注意 项目已分享到GitHub上 如果需要的可以看下 springboot shiro项目Git下载地址 在我前几篇文章里有shiro配置的文件下载包 下载后里边有四个配置文件Shir
  • 618省心凑背后的新算法——个性化凑单商品打包购推荐

    作为购物导购链路的一个重要环节 凑单旨在快速帮助用户找到达成某个满减门槛 比如满300减50 的商品 完成性价比最高的跨店组合结算 前言 背景 凑单是一个历史悠久的场景 伴随着长期优化并不断升级 为用户决策提供了便捷通道 作为购物导购链路的
  • Nginx部署前端,并转发2个后台,实现负载均衡

    一 vue打包 cmd进入项目目录 执行 npm run build 会在改目录生成dist文件 假设dist目录是 D dist 二 部署 下载nginx 修改nginx conf 在http 中加入 upstream myapp1 se
  • 在SQL中直接使用存储过程查询返回的结果集

    在实际使用存储过程是 有时我们希望先判断存储过程的返回结果集是否有记录 然后走不同的业务逻辑 这是就需要在SQL语句中直接读取到存储过程的返回结果集 方式如下 先按照存储过程结果集定义一个变量 declare tbl table 门诊号 v
  • js预编译(与C预处理区别)

    目录 1 函数体内 例 2 全局 注 window 属性和 imply global属性 3 全局和函数体内结合 优先顺序 例1 例2 例3 重要提示 第一次学的时候以为和C预处理差不多 看了下才发现区别还蛮大的 例1 test 打印出 a
  • 说一下Photo服务器

    误打误撞学习了一下Photo服务器 自己去百度上找 有些问题大家也没说清楚 所以 在这里补充说明一下 现在Photo官网已经更新到4 0 29 说不定有更新了 但是很多教程都是Photo3 0的版本 虽然大体还是差不多 但是对于初学者来说还
  • mysql useunicode_jdbc连接mysql 为什么在连接时已经这样设置了 ?useUnicode=true&characterEncodin...

    jdbc连接mysql 为什么在连接时已经这样设置了 useUnicode true characterEncoding UTF 8 autoReconnect true maxReconnects 10 autoReconnectForP
  • Python 并发系列 2 —— 各种并发方案的选择

    目录 二 异步 Python 不同形式的并发 2 1 术语定义 同步 Sync vs 异步 Async 并发 Concurrency vs 并行 Parallelism 2 2 线程 Threads 进程 Processes Threads