我正在尝试使用 python、asyncio 和同时使用多个队列asynqp http://asynqp.readthedocs.io/en/v0.4/.
我不明白为什么我的asyncio.sleep()
函数调用没有任何效果。代码并没有停在那里。公平地说,我实际上不明白回调是在哪个上下文中执行的,也不知道我是否可以将控制权交给事件循环(以便asyncio.sleep()
打电话才有意义)。
如果我必须使用一个怎么办aiohttp.ClientSession.get()
我的函数调用process_msg
回调函数?我无法做到这一点,因为它不是协程。一定有一种方法超出了我目前对 asyncio 的理解。
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()
我不明白为什么我的 asyncio.sleep() 函数调用没有
任何效果。
Because asyncio.sleep()
返回一个必须与事件循环结合使用的 future 对象(或async/await
语义)。
你不能使用await
简单来说def
声明,因为回调是在外部调用的async/await
上下文附加到引擎盖下的某个事件循环。换句话说,将回调风格与async/await
风格相当棘手。
不过,简单的解决方案是将工作安排回事件循环:
async def process_msg(msg):
await asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
def _process_msg(msg):
loop = asyncio.get_event_loop()
loop.create_task(process_msg(msg))
# or if loop is always the same one single line is enough
# asyncio.ensure_future(process_msg(msg))
# some code
await queue.consume(_process_msg)
请注意,其中没有递归_process_msg
功能,即身体process_msg
在中时不执行_process_msg
。内部process_msg
一旦控件返回到事件循环,函数就会被调用。
这可以用下面的代码来概括:
def async_to_callback(coro):
def callback(*args, **kwargs):
asyncio.ensure_future(coro(*args, **kwargs))
return callback
async def process_msg(msg):
# the body
# some code
await queue.consume(async_to_callback(process_msg))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)