从 python 3.6 开始异步发电机 https://www.python.org/dev/peps/pep-0525/,只需进行很少的更改即可使您的代码与 asyncio 兼容。
The io_task
函数变成协程:
async def io_task(x):
await asyncio.sleep(1)
return 2*x
The producer
发电机变成异步发电机:
async def producer(xs):
for x in xs:
yield await io_task(x)
The consumer
函数成为协程并使用aiofiles https://pypi.python.org/pypi/aiofiles、异步上下文管理和异步迭代:
async def consumer(xs):
async with aiofiles.open('output.txt', 'w') as fp:
async for x in xs:
await fp.write(str(x) + '\n')
主协程在事件循环中运行:
data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()
另外,您可以考虑使用音频流 https://github.com/vxgmichel/aiostream在生产者和消费者之间管道化一些处理操作。
编辑:不同的 I/O 任务可以通过使用轻松地在生产者端同时运行已完成 https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed:
async def producer(xs):
coros = [io_task(x) for x in xs]
for future in asyncio.as_completed(coros):
yield await future