二、异步 Python:不同形式的并发
翻译自:Async Python: The Different Forms of Concurrency
随着 Python 3 的出现,我们听到了很多关于“异步(async)”和“并发(concurrency)”的讨论,人们可能会简单地假设 Python 最近才引入了这些概念/功能。但这显然不是,我们已经使用异步和并发操作很多次了。许多初学者可能认为 asyncio
是进行异步/并发操作的唯一/最佳方法。在本文中,我们将探讨实现并发性的不同方法以及它们的优缺点。
2.1 术语定义
在我们深入研究技术方面之前,有必要对这个上下文中经常使用的术语有一些基本的理解。
同步(Sync) vs 异步(Async)
在同步操作中,任务一个接一个地同步执行。在异步操作中,任务可以独立地启动和完成。当执行转移到新任务时,一个异步任务可以启动并继续运行。异步任务不阻塞(使执行等待其完成)操作,通常在后台运行。
例如,当你需要打电话给旅行社预订你的下一个假期。在你去旅游之前,你需要给你的老板发一封电子邮件。以同步的方式,你会先打电话给旅行社,如果他们让你稍等片刻,你就会一直等着。完成后,你就开始给老板写邮件。在这里你完成了一个又一个的任务。但如果你很聪明,当你等待的时候,你可以开始写邮件,当他们和你谈话时,你暂停写邮件,和他们谈话,然后继续写邮件。你也可以让朋友在你写完邮件的时候打电话。这是异步性。任务不会相互阻塞。
并发(Concurrency) vs 并行(Parallelism)
并发意味着两个任务一起取得进展。在前面的例子中,当我们考虑异步示例时,我们在与旅行社的通话和编写电子邮件方面都取得了进展。这是并发。但当我们谈到从一个朋友那里获得帮助时,在这种情况下,两个任务将并行运行。并行实际上是并发的一种形式。但并行性依赖于硬件。例如,如果 CPU 中只有一个内核,两个操作就不能真正并行运行。他们只是分享同一个核心的时间片。这是并发性,但不是并行性。但当我们有多个核心时,我们实际上可以并行运行两个或更多操作(取决于核心的数量)。
快速小结:
- 同步:阻塞操作。
- 异步:非阻塞操作。
- 并发:一个人同时干多件事情。
- 并行:多个人一起干多件事情。
并行意味着并发,但并发并不总是并行的。
2.2 线程(Threads)& 进程(Processes)
线程允许我们并发运行,但由于 GIL 的存在,线程没有提供并行性。然而,Python 下的多进程可以利用多核绕开 GIL。
Threads
worker
函数将以多线程、异步、并发的形式执行
import threading
import time
import random
def worker(number):
sleep = random.randrange(1, 10)
time.sleep(sleep)
print("I am Worker {}, I slept for {} seconds".format(number, sleep))
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
t.setDaemon(False) # 如果设置为 True,当主进程结束时,不管子线程有没有完成都会被迫中止
t.start()
# t.join() # 是否阻塞
print("All Threads are queued, let's see when they finish!")
# 加入 join 阻塞
# 线程由于阻塞是线性,一个接一个的运行,实际共耗时 1+4+7+7+8
$ python thread_test.py
I am Worker 0, I slept for 1 seconds
I am Worker 1, I slept for 4 seconds
I am Worker 2, I slept for 7 seconds
I am Worker 3, I slept for 7 seconds
I am Worker 4, I slept for 8 seconds
All Threads are queued, let's see when they finish!
# 去掉 t.join(),则主进程不等,所以先输出下面打印信息
All Threads are queued, let's see when they finish!
I am Worker 3, I slept for 1 seconds
I am Worker 4, I slept for 2 seconds
I am Worker 1, I slept for 4 seconds
I am Worker 2, I slept for 4 seconds
I am Worker 0, I slept for 9 seconds
# 将 setDaemon 设置为 True,主进程输出下面打印信息就结束了,子线程还没来得及开始就被迫中止
All Threads are queued, let's see when they finish!
因此,你可以看到我们启动了5个线程,它们一起取得进展,当我们启动线程(从而执行 worker 函数)时,操作不会等待线程完成,然后再转到下一个 print 语句。所以这是一个异步操作。
进一步阅读: https://pymotw.com/3/threading/index.html
Global Interpreter Lock (GIL)
引入全局解释器锁 GIL 是为了使 CPython 的内存管理更容易,并允许更好地与 C 集成(例如扩展)。GIL 是一种锁机制,Python 解释器一次只允许运行一个线程。在任何给定的时间点只允许有一个线程可以执行 Python 字节码。这个 GIL 确保多个线程不会并行运行。
关于GIL的简要事实:
- 一次只能运行一个线程。
- Python 解释器在线程之间切换以允许并发。
- GIL 只适用于CPython(事实上的实现)。其他实现,如 Jython、IronPython 没有 GIL。
- GIL 使单线程程序快速。
- 对于 I/O 密集型操作,GIL 通常不会造成太大危害。
- GIL 使集成非线程安全的 C 库变得容易,因此我们有许多用 C 编写的高性能扩展/模块。
- 对于 CPU 密集型的任务,解释器在 N 个 ticks 和 switches 线程之间进行检查。所以一个线程不会阻塞其他线程。
许多人认为 GIL 是一种弱点。我认为这是一种幸事,因为它使得像 NumPy、SciPy 这样的库成为可能,它们使Python 在科学界占据了独特的地位。
进一步阅读:http://www.dabeaz.com/python/UnderstandingGIL.pdf
Processes
使用 multiprocessing
稍微修改一下代码:
import multiprocessing
import time
import random
def worker(number):
sleep = random.randrange(1, 10)
time.sleep(sleep)
print("I am Worker {}, I slept for {} seconds".format(number, sleep))
for i in range(5):
t = multiprocessing.Process(target=worker, args=(i,))
t.start()
print("All Processes are queued, let's see when they finish!")
用 multiprocessing
替代 threading
模块,用 Process
方法替代 Thread
方法,现在便可以充分利用 CPU 的多核。借助 Pool
类,我们可以向同一个函数传递不同的参数进行计算,例如:
from multiprocessing import Pool
import time
def f(x):
return x ** 2
if __name__ == '__main__':
a = list(range(100000000))
start = time.time()
p = Pool(10)
d = p.map(f, a)
print(time.time() - start)
start = time.time()
b = [x ** 2 for x in a]
print(time.time() - start)
start = time.time()
c = (x ** 2 for x in a)
print(time.time() - start)
# 7.4659998416900635
# 20.971519947052002
# 3.0994415283203125e-06
在这里,我们实际上是在不同的进程上运行该函数,而不是遍历值列表并逐个调用 f
。一个进程执行 f(1)
,另一个运行 f(2)
,另一个运行 f(3)
。最后,结果再次聚合到一个列表中。这将使我们能够将繁重的计算分解成更小的部分,并并行运行它们以加快计算速度。(译者注:本机实测,元组生成式最快)
from multiprocessing import Pool
import time
def f(x):
return x ** 4
num = 100000000
start_time = time.time()
p = Pool(4)
result = p.map(f, list(range(num)))
print(time.time() - start_time)
print(len(result), result[9])
# 48.95770812034607
# 100000000 6561
start_time = time.time()
result = [x**4 for x in range(num)]
print(time.time() - start_time)
print(len(result), result[9])
# 37.78309106826782
# 100000000 6561
# num = 10^7
5.439199209213257
10000000 6561
3.675222158432007
10000000 6561
# num = 10^4
0.01634526252746582
10000 6561
0.003140687942504883
10000 6561
进一步阅读: https://pymotw.com/3/multiprocessing/index.html
concurrent.futures
模块
The concurrent.futures
module packs some really great stuff for writing async codes easily. My favorites are the ThreadPoolExecutor
and the ProcessPoolExecutor
. These executors maintain a pool of threads or processes. We submit our tasks to the pool and it runs the tasks in available thread/process. A Future
object is returned which we can use to query and get the result when the task has completed.
Here’s an example of ThreadPoolExecutor
:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())
I have a blog post on the concurrent.futures
module here: http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html which might be helpful for exploring the module deeper.
Further Reading: https://pymotw.com/3/concurrent.futures/
2.3、Asyncio - Why、What、How?
你可能有很多 Python 社区的人都会问的一个问题:asyncio
带来了什么新特性?为什么我们需要多种异步 I/O 的方法?我们还接着用线程和进程吗?
为什么我们需要 asyncio?
进程的过程成本高昂。所以对于 I/O,基本会优先选择线程。我们知道,I/O 依赖于外部设备—缓慢的磁盘或糟糕的网络延迟,这通常会使 I/O 变得不可预测。现在,假设我们使用线程进行 I/O 密集型操作。3 个线程正在执行不同的 I/O 任务。解释器需要在并发线程之间切换,并给每个线程轮流一些时间。假设有 3 个线程 T1
、T2
和T3
。这三个线程已开始其 I/O 操作。T3
先完成。T2
和 T1
仍在等待 I/O。Python 解释器切换到 T1
,但它仍在等待,好吧,然后它移到 T2
,它也在等待,然后移到 T3
,T3
已经准备好并开始执行代码。你看到这里的问题了吗?
T3
已经准备好了,但是解释器先在 T2
和 T1
之间切换,这就产生了切换成本,如果解释器先转到 T3
,我们就可以避免这种问题,对吧?
什么是 asyncio?
Asyncio 为我们提供了一个事件循环(event loop)和其他好东西。事件循环跟踪不同的 I/O 事件,并切换到已准备好的任务,并暂停正在等待 I/O 的任务。因此,我们不会在尚未准备好立即运行的任务上浪费时间。
这个想法很简单。有一个事件循环。我们有运行异步 I/O 操作的函数。我们将函数赋给事件循环,并要求它为我们运行这些函数。事件循环给我们一个 Future
对象,就像一个承诺,我们将在未来得到一些东西。我们拿到这个承诺,一次又一次地检查它是否有值(当我们感到不耐烦的时候),最后当 future 有值的时候,我们在其他一些操作中使用它。
Asyncio 使用生成器和协程来暂停和恢复任务。你可以阅读以下文章了解更多详细信息:
- http://masnun.com/2015/11/20/python-asyncio-future-task-and-the-event-loop.html
- http://masnun.com/2015/11/13/python-generators-coroutines-native-coroutines-and-async-await.html
我们应该怎么使用 asyncio?
我们先看看下面这个例子
import asyncio
import datetime
import random
async def my_sleep_func():
await asyncio.sleep(random.randint(0, 5))
async def display_date(num, loop):
end_time = loop.time() + 50.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
await my_sleep_func()
loop.stop() # 原文缺失,不加这句话,loop 永远在运行,程序永远不结束
loop = asyncio.get_event_loop()
asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))
loop.run_forever()
注意 async/await
语法只在 python 3.5+ 版本中,如果我们浏览代码会发现:
- 我们有一个异步函数
display_date
,它接收一个数字(一个标志号)及事件循环作为参数
- 这个函数有一个死循环,在 50s 后退出循环,在这 50s 内,它不断的输出当前时间及休眠一会儿。
await
函数可以等待其他异步函数(协程)先去完成。
- 我们把函数传递到事件循环中(使用
ensure_future
方法)
- 开始运行事件循环
无论何时进行 await
调用,asyncio 都知道函数可能需要一些时间。因此,它暂停执行,开始监视与其相关的任何 I/O 事件,并允许任务运行。当 asyncio 注意到暂停的函数的 I/O 已就绪时,它将恢复该函数。
2.4、如何做出正确的选择
我们已经介绍了进程、线程、协程等多种并发方法,还是那个问题,我们应该选择哪一个呢?这取决于使用情景,根据使用经验以及相关资料推荐,我整理了下面这份伪代码:
if io_bound:
if io_very_slow:
print("Use Asyncio")
else:
print("Use Threads")
else:
print("Multi Processing")
- CPU 密集型 => 使用多进程(我擦,有时候多进程不一定干的过单线程,比如上面的示例,小任务量还是别用了,耗资源太狠了而且进程管理成本还很高)
- I/O 密集型, I/O 很快 => 多线程
- I/O 密集型, I/O 很慢, => Asyncio (协程)