我正在尝试测试异步代码,但由于某些任务之间的复杂连接而遇到了麻烦。
我需要的上下文是一些代码,它与另一个进程写入文件的同时并行读取文件。代码中有一些逻辑,读取被截断的记录会使其后退并wait()
on an asyncio.Condition
稍后由inotify https://man7.org/linux/man-pages/man7/inotify.7.html事件。当另一个进程完成将来的写入时,此代码应该让它通过重新读取记录来恢复。我特别想测试一下这种恢复是否有效。
所以我的计划是:
- 写入部分文件
- 运行事件循环,直到它根据条件暂停
- 写入文件的其余部分
- 运行事件循环直至完成
我原以为这是答案:检测空闲异步事件循环 https://stackoverflow.com/questions/47369252/detect-an-idle-asyncio-event-loop
然而,试验表明它退出得太早了:
import asyncio
import random
def test_ping_pong():
async def ping_pong(idx: int, oth_idx: int):
for i in range(random.randint(100, 1000)):
counters[idx] += 1
async with conditions[oth_idx]:
conditions[oth_idx].notify()
async with conditions[idx]:
await conditions[idx].wait()
async def detect_iowait():
loop = asyncio.get_event_loop()
rsock, wsock = socket.socketpair()
wsock.close()
try:
await loop.sock_recv(rsock, 1)
finally:
rsock.close()
conditions = [asyncio.Condition(), asyncio.Condition()]
counters = [0, 0]
loop = asyncio.get_event_loop()
loop.create_task(ping_pong(0, 1))
loop.create_task(ping_pong(1, 0))
loop.run_until_complete(loop.create_task(detect_iowait()))
assert counters[0] > 10
assert counters[1] > 10
在深入研究了 python 事件循环的源代码后,我发现没有任何公开的内容可以公开执行此操作。
然而,可以使用_ready
双端队列由创建BaseEventLoop
. See here https://github.com/python/cpython/blob/5efb1a77e75648012f8b52960c8637fc296a5c6d/Lib/asyncio/base_events.py#L390。这包含立即准备运行的每个任务。当任务运行时,它会从_ready
双端队列。当挂起的任务被另一个任务释放时(例如通过调用future.set_result() https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.set_result) 挂起的任务立即添加回双端队列。这是从 python 3.5 开始就存在的。
您可以做的一件事是重复注入回调以检查中有多少项目_ready
。当所有other任务被挂起,回调运行时 dqueue 中将没有任何内容。
事件循环的每次迭代最多运行一次回调:
async def wait_for_deadlock(empty_loop_threshold: int = 0):
def check_for_deadlock():
nonlocal empty_loop_count
# pylint: disable=protected-access
if loop._ready:
empty_loop_count = 0
loop.call_soon(check_for_deadlock)
elif empty_loop_count < empty_loop_threshold:
empty_loop_count += 1
loop.call_soon(check_for_deadlock)
else:
future.set_result(None)
empty_loop_count = 0
loop = asyncio.get_running_loop()
future = loop.create_future()
asyncio.get_running_loop().call_soon(check_for_deadlock)
await future
在上面的代码中empty_loop_threshold
在大多数情况下并不是真正必要的,但在任务与 IO 通信的情况下存在。例如,如果一个任务通过 IO 与另一个任务通信,则可能会有一个时刻所有任务都被挂起,即使其中一个任务已准备好读取数据。环境empty_loop_threshold = 1
应该解决这个问题。
使用这个相对简单。你可以:
loop.run_until_complete(wait_for_deadlock())
或者按照我的问题中的要求:
def some_test():
async def async_test():
await wait_for_deadlock()
inject_something()
await wait_for_deadlock()
loop = loop.get_event_loop()
loop.create_task(task_to_test())
loop.run_until_complete(loop.create_task(async_test)
assert something
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)