所以 - 一件事(虽然有点)是测量常规协同例程中的时间 - 我用装饰器来完成它。
然而,当你进一步进入异步生成器时,它是另一个野兽 - 我仍在试图弄清楚 - 它是公开公开的异步迭代器方法的混合(__anext__
, asend
等...)在返回的对象中使用传统的迭代器方法,我还无法弄清楚(我刚刚打开 PEP 525 看看我是否能理解它)。
至于常规的协同例程,有一个问题:如果你创建一个可等待的类(我的装饰器),asyncio 将要求其__await__
方法返回带有 a 的内容__next__
方法,该方法将被调用。但原生 Python 协程没有__next__
:异步调用send()
在这些上 - 所以我必须进行“转置”以便能够测量时间(在协同例程中)。
import asyncio
import time
class ProfileCoro:
def __init__(self, corofunc):
self.corofunc = corofunc
def __call__(self, *args, **kw):
# WARNING: not parallel execution-safe: fix by
# keeping "self.ellapsed" in a proper contextvar
self.ellapsed = 0
self.coro = self.corofunc(*args, **kw)
return self
def __await__(self):
return self
def __iter__(self):
return self
def __next__(self):
return self.send(None)
def throw(self, exc):
print(f"Arghh!, got an {exc}")
self.coro.throw(exc)
def send(self, arg):
start = time.monotonic()
try:
result = self.coro.send(arg)
except StopIteration:
duration = time.monotonic() - start
self.ellapsed += duration
print(f"Ellapsed time in execution of {self.corofunc.__name__}: {self.ellapsed:.05f}s")
raise
duration = time.monotonic() - start
self.ellapsed += duration
return result
def __repr__(self):
return f"<ProfileCoro wrapper for {self.corofunc}>"
@ProfileCoro
async def run():
for i in range(5):
await asyncio.sleep(0.2)
# yield i
return 1
@ProfileCoro
async def walk():
for i in range(5):
time.sleep(0.3)
#yield i
return 3
async def main():
await run()
await walk()
return
asyncio.run(main())
To maybe如果我能弄清楚如何包装异步生成器,请继续。
(我认为大多数现有的分析工具都使用该语言中可用的工具来进行调试器和跟踪(通过sys.settrace()
:一切都在回调中“可见”,并且不用担心包装异步机制和异步循环发出的所有内部调用)
...
因此,这里是用于捕获异步生成器中的时间的代码。
它将得到幸福的道路 - 如果有复杂的等待类,实现或利用asend
, athrow
,这不行 - 但对于一个简单的异步生成器函数插件来说async for
声明它现在有效:
免责声明:下面的代码中可能有未使用的代码,甚至未使用的状态 - 我来回了相当多的时间来让它工作(其中很多是因为我没有尝试过这一事实)__anext__
本身必须是异步的)。尽管如此,事情还是这样:
import asyncio
import time
from functools import wraps
async def _a():
yield 1
async_generator_asend_type = type(_a().__anext__())
class ProfileCoro:
def __init__(self, corofunc):
self.corofunc = corofunc
self.measures = 0
def measure_ellapsed(func):
@wraps(func)
def wrapper(self, *args, **kw):
self.measures += 1
if self.measures > 1:
try:
return func(self, *args, **kw)
finally:
self.measures -= 1
start = time.monotonic()
try:
result = func(self, *args, **kw)
except (StopIteration, StopAsyncIteration):
self.ellapsed += time.monotonic() - start
#self.print_ellapsed()
raise
finally:
self.measures -= 1
self.ellapsed += time.monotonic() - start
return result
return wrapper
def print_ellapsed(self):
name = getattr(self.corofunc, "__name__", "inner_iterator")
print(f"Ellapsed time in execution of {name}: {self.ellapsed:.05f}s")
def __call__(self, *args, **kw):
# WARNING: not parallel execution-safe: fix by
# keeping "self.ellapsed" in a proper contextvar
self.ellapsed = 0
self.measures = 0
if not isinstance(self.corofunc, async_generator_asend_type):
self.coro = self.corofunc(*args, **kw)
else:
self.coro = self.corofunc
return self
def __await__(self):
return self
def __iter__(self):
return self
@measure_ellapsed
def __next__(self):
target = self.coro
if hasattr(target, "__next__"):
return target.__next__()
elif hasattr(target, "send"):
return target.send(None)
async def athrow(self, exc):
print(f"Arghh!, got an async-iter-mode {exc}")
return await self.async_iter.athrow(exc)
def throw(self, exc):
print(f"Arghh!, got an {exc}")
self.coro.throw(exc)
@measure_ellapsed
def send(self, arg):
return self.coro.send(arg)
def __aiter__(self):
return self
#async def asend(self, value):
...
async def aclose(self):
return await self.async_iter.close()
def close(self):
return self.async_iter.close()
async def __anext__(self):
if not hasattr(self, "async_iter"):
self.async_iter = aiter(self.coro)
self.inner_profiler = ProfileCoro(self.async_iter.__anext__())
#start = time.monotonic()
try:
result = await self.inner_profiler()
except StopAsyncIteration:
#self.print_ellapsed()
raise
finally:
self.ellapsed += self.inner_profiler.ellapsed
return result
def __repr__(self):
return f"<ProfileCoro wrapper for {self.corofunc}>"
@ProfileCoro
async def run():
for i in range(5):
await asyncio.sleep(0.05)
# yield i
return 1
@ProfileCoro
async def walk():
for i in range(5):
time.sleep(0.05)
#yield i
return 3
@ProfileCoro
async def irun():
for i in range(5):
await asyncio.sleep(0.05)
yield i
@ProfileCoro
async def iwalk():
for i in range(5):
time.sleep(0.05)
yield i
async def main():
await run()
run.print_ellapsed()
await walk()
walk.print_ellapsed()
async for _ in irun():
print(".", end="", flush=True)
irun.print_ellapsed()
async for _ in iwalk():
pass
iwalk.print_ellapsed()
return
asyncio.run(main())