将 asyncio 与多工作进程 ProcessPoolExecutor 相结合

2023-12-27

是否可以采取阻塞功能,例如work并让它同时运行在ProcessPoolExecutor有不止一名工人?

import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor

num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()

def work():
    sleep(1)

async def producer():
    for i in range(num_jobs):
        results = await loop.run_in_executor(executor, work)
        await queue.put(results)

async def consumer():
    completed = 0
    while completed < num_jobs:
        job = await queue.get()
        completed += 1

s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)

在具有 4 个以上内核的机器上运行上述代码大约需要 4 秒。你会怎么写producer这样上面的例子只需要~1秒?


await loop.run_in_executor(executor, work)阻塞循环直到work完成后,一次只能运行一个函数。

要同时运行作业,您可以使用asyncio.as_completed:

async def producer():
    tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
    for f in asyncio.as_completed(tasks, loop=loop):
        results = await f
        await queue.put(results)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 asyncio 与多工作进程 ProcessPoolExecutor 相结合 的相关文章