异步队列消费者协程

2024-02-21

我有一个asyncio.Protocol子类从服务器接收数据。 我将这些数据(每行,因为数据是文本)存储在asyncio.Queue.

import asyncio

q = asyncio.Queue()

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        for message in data.decode().splitlines():
            yield q.put(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
                              '127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

我想要另一个协程负责消耗队列中的数据并处理它。

  • 这应该是一个asyncio.Task?
  • 如果队列因几秒钟内没有收到数据而变空怎么办?我如何确保我的消费者不会停止(run_until_complete)?
  • 有没有比为队列使用全局变量更干净的方法?

这应该是 asyncio.Task 吗?

是的,使用创建它asyncio.ensure_future https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future or 循环创建任务 https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.create_task.

如果队列因几秒钟内没有收到数据而变空怎么办?

只需使用队列.get https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.get等待物品可用:

async def consume(queue):
    while True:
        item = await queue.get()
        print(item)

有没有比为队列使用全局变量更干净的方法?

是的,只需将其作为参数传递给消费者协程和流协议:

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop, queue):
        self.loop = loop
        self.queue = queue

    def data_received(self, data):
        for message in data.decode().splitlines():
            self.queue.put_nowait(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

如何确保我的消费者不会停止(run_until_complete)?

连接关闭后,使用队列.加入 https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue.join等到队列为空。


完整示例:

loop = asyncio.get_event_loop()
queue = asyncio.Queue()
# Connection coroutine
factory = lambda: StreamProtocol(loop, queue)
connection = loop.create_connection(factory, '127.0.0.1', '42')
# Consumer task
consumer = asyncio.ensure_future(consume(queue))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the loop
loop.close()

或者,您也可以使用streams https://docs.python.org/3/library/asyncio-stream.html?highlight=streams#tcp-echo-client-using-streams:

async def tcp_client(host, port, loop=None):
    reader, writer = await asyncio.open_connection(host, port, loop=loop)
    async for line in reader:
        print(line.rstrip())
    writer.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop))
loop.close()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

异步队列消费者协程 的相关文章

随机推荐