正如该输出告诉您的那样,run_in_executor
返回一个Future
。您需要等待它才能得到结果。
record = await loop.run_in_executor(
None, something_cpu_bound_task_here, record
)
请注意,任何参数something_cpu_bound_task_here
需要传递给run_in_executor
.
此外,正如您所提到的,这是一个 CPU 密集型任务,您需要确保您使用的是concurrent.futures.ProcessPoolExecutor
。除非你打过电话loop.set_default_executor
某处,默认值是一个实例ThreadPoolExecutor
.
with ProcessPoolExecutor() as executor:
for record in records:
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
最后,您的 while 循环有效地同步运行。你需要等待未来,然后等待obj.add
在继续处理下一个项目之前records
。您可能想稍微重组您的代码并使用类似的东西gather
以允许一些并发。
async def process_record(record, obj, loop, executor):
record = await loop.run_in_executor(
executor, something_cpu_bound_task_here, record
)
await obj.add(record)
async def fun():
loop = asyncio.get_running_loop()
records = await receive()
with ProcessPoolExecutor() as executor:
await asyncio.gather(
*[process_record(record, obj, loop, executor) for record in records]
)
我不知道如何处理obj
因为您的示例中没有定义这一点,但我相信您可以弄清楚。