我正在习惯 asyncio,并发现任务处理非常好,但将异步库与传统 io 库混合起来可能很困难。我当前面临的问题是如何正确解码异步 StreamReader。
最简单的解决方案是read()
字节字符串块,然后解码每个块 - 请参阅下面的代码。 (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一个方法进行处理):
import asyncio
import aiohttp
async def get_data(port):
url = 'http://localhost:{}/'.format(port)
r = await aiohttp.get(url)
stream = r.content
while not stream.at_eof():
data = await stream.read(4)
print(data.decode('utf-8'))
这工作得很好,直到有一个 utf-8 字符被分割在太多的块之间。例如,如果响应是b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
,那么读取 3 的块将起作用,但 4 的块将不起作用(如\xc3
and \x9f
位于不同的块中并解码以结尾的块\xc3
会引发以下错误:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data
我研究了这个问题的正确解决方案,至少在阻塞世界中,似乎是 io.TextIOWrapper 或 codecs.StreamReaderWriter (其中讨论了它们的差异)PEP 0400 https://www.python.org/dev/peps/pep-0400/)。然而,这两者都依赖于典型的阻塞流。
我花了 30 分钟搜索 asyncio 的示例,并不断找到我的decode() 解决方案。有谁知道更好的解决方案或者这是 python 的 asyncio 中缺少的功能吗?
作为参考,以下是将两个“标准”解码器与异步流结合使用的结果。
使用编解码器流读取器:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
例外:
File "echo_client.py", line 13, in get_data
data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
(它直接调用 read(),而不是yield from
or await
it)
我还尝试使用 io.TextIOWrapper 包装流:
stream = TextIOWrapper(r.content)
但这会导致以下结果:
File "echo_client.py", line 10, in get_data
stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
附:如果您想要这方面的示例测试用例,请查看这个要点 https://gist.github.com/ethanfrey/75e58db27095936b9e5e。您可以使用python3.5运行它来重现该错误。如果将块大小从 4 更改为 3(或 30),它将正常工作。
EDIT
接受的答案就像魅力一样解决了这个问题。谢谢!如果其他人有这个问题,这里是我制作的一个简单的包装类,用于处理 StreamReader 上的解码:
import codecs
class DecodingStreamReader:
def __init__(self, stream, encoding='utf-8', errors='strict'):
self.stream = stream
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
async def read(self, n=-1):
data = await self.stream.read(n)
if isinstance(data, (bytes, bytearray)):
data = self.decoder.decode(data)
return data
def at_eof(self):
return self.stream.at_eof()