我计划有一个基于异步队列的生产者-消费者实现来处理实时数据,其中以正确的时间顺序发送数据至关重要。这是它的代码片段:
async def produce(Q, n_jobs):
for i in range(n_jobs):
print(f"Producing :{i}")
await Q.put(i)
async def consume(Q):
while True:
n = await Q.get()
print(f"Consumed :{n}")
x = do_sometask_and_return_the_result(n)
print(f"Finished :{n} and Result: {x}")
async def main(loop):
Q = asyncio.Queue(loop=loop, maxsize=3)
await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
print("Done")
这里生产者生产数据并将其放入异步队列中。我有多个消费者来消费和处理数据。在查看输出时,在打印“Consumed :{n}”时保持顺序(如 1,2,3,4... 等),这完全没问题。但是,由于函数 do_sometask_and_return_the_result(n) 需要可变的时间来返回结果,因此在 n "Finished :{n}" 的下一个打印中不会保持顺序(如 2,1,4,3,5,.. .)。
由于我需要维护打印结果的顺序,有什么方法可以同步这些数据吗?即使在 do_sometask_and_return_the_result(n) 之后,我也想看到 'n' 的 1,2,3,4,.. 顺序打印。