我正在尝试学习如何(惯用地)使用Python 3.4asyncio
。我最大的障碍是如何“链接”不断消耗数据的协程,用数据更新状态,并允许另一个协程使用该状态。
我期望从这个示例程序中观察到的行为只是定期报告从子进程接收到的数字总和。报告的发生率应大致与Source
对象从子进程接收数字。报告函数中的 IO 阻塞不应阻塞子进程的读取。如果报告功能阻塞的时间比从子进程读取迭代的时间长,我不在乎它是否向前跳过或立即报告一堆;但应该有尽可能多的迭代reporter()
因为有expect_exact()
在足够长的时间范围内。
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
p.terminate()
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source # ???
print("New sum is: {:d}".format(new_sum))
# Potentially some other blocking operation
yield from limited_throughput.write(new_sum)
def main():
loop = asyncio.get_event_loop()
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.async(reporter(source))
loop.run_until_complete(source.run())
loop.close()
if __name__ == '__main__':
main()
这个例子需要pexpect
从 git 安装;你可以很容易地更换run()
with:
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
while self.flag.is_set():
value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
self.sum += value
但我感兴趣的真正子进程需要在pty
,我认为这意味着提供的子进程传输/协议框架asyncio
对此还不够。要点是异步活动的源是一个协程,可以与yield from
.
请注意,reporter()
此示例中的函数不是有效代码;我的问题是我不知道里面应该放什么。理想情况下我想保留reporter()
代码独立于run()
;本练习的目的是了解如何使用以下组件将更复杂的程序分解为更小的代码单元asyncio
.
有没有一种方法可以构建这种行为asyncio
module?