我尝试使用run_in_executor
并有一些问题。这是代码(基本上是从文档复制粘贴)
import asyncio
import concurrent.futures
def cpu_bound(val):
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
print(f'Start task: {val}')
sum(i * i for i in range(10 ** 7))
print(f'End task: {val}')
async def async_task(val):
print(f'Start async task: {val}')
while True:
print(f'Tick: {val}')
await asyncio.sleep(1)
async def main():
loop = asyncio.get_running_loop()
## Options:
for i in range(5):
loop.create_task(async_task(i))
# 1. Run in the default loop's executor:
# for i in range(10):
# loop.run_in_executor(
# None, cpu_bound, i)
# print('default thread pool')
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
# for i in range(10):
# loop.run_in_executor(
# pool, cpu_bound, i)
# print('custom thread pool')
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor(max_workers = 10) as pool:
for i in range(10):
loop.run_in_executor(
pool, cpu_bound, i)
print('custom process pool')
while True:
await asyncio.sleep(1)
asyncio.run(main())
Case 1: run_in_executor
where executor
is None
:
async_task
的执行时间与cpu_bound
的执行。
在其他情况下async_task
的将在之后执行cpu_bound
完成了。
我想当我们使用ProcessPoolExecutor
任务不应该阻塞循环。我哪里错了?
在其他情况下async_task
的将在之后执行cpu_bound
完成了。我想当我们使用ProcessPoolExecutor
任务不应该阻塞循环。我哪里错了?
问题是with XXXPoolExecutor()
结束时关闭泳池with
堵塞。池关闭会等待挂起的任务完成,这会阻塞事件循环并且与 asyncio 不兼容。由于您的第一个变体不涉及with
声明,不存在这个问题。
解决方案很简单,删除with
语句并创建一次池(例如在顶层或在main()
),并且只是use它在函数中。如果需要,您可以通过调用显式关闭池pool.shutdown()
after asyncio.run()
已完成。
另请注意,您永远不会awaiting返回的期货loop.run_in_executor
。这是一个错误,asyncio 可能会警告您;您可能应该将返回的值收集在列表中并使用类似的内容等待它们results = await asyncio.gather(*tasks)
。这不仅会收集结果,还会确保脱线程函数中发生的异常正确传播到您的代码而不是被丢弃。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)