使用 StreamReader 异步解码 utf-8

2024-04-04

我正在习惯 asyncio,并发现任务处理非常好,但将异步库与传统 io 库混合起来可能很困难。我当前面临的问题是如何正确解码异步 StreamReader。

最简单的解决方案是read()字节字符串块,然后解码每个块 - 请参阅下面的代码。 (在我的程序中,我不会打印每个块,而是将其解码为字符串并将其发送到另一个方法进行处理):

import asyncio
import aiohttp

async def get_data(port):
    url = 'http://localhost:{}/'.format(port)
    r = await aiohttp.get(url)
    stream = r.content
    while not stream.at_eof():
        data = await stream.read(4)
        print(data.decode('utf-8'))

这工作得很好,直到有一个 utf-8 字符被分割在太多的块之间。例如,如果响应是b'M\xc3\xa4dchen mit Bi\xc3\x9f\n',那么读取 3 的块将起作用,但 4 的块将不起作用(如\xc3 and \x9f位于不同的块中并解码以结尾的块\xc3会引发以下错误:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

我研究了这个问题的正确解决方案,至少在阻塞世界中,似乎是 io.TextIOWrapper 或 codecs.StreamReaderWriter (其中讨论了它们的差异)PEP 0400 https://www.python.org/dev/peps/pep-0400/)。然而,这两者都依赖于典型的阻塞流。

我花了 30 分钟搜索 asyncio 的示例,并不断找到我的decode() 解决方案。有谁知道更好的解决方案或者这是 python 的 asyncio 中缺少的功能吗?

作为参考,以下是将两个“标准”解码器与异步流结合使用的结果。

使用编解码器流读取器:

r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)

例外:

File "echo_client.py", line 13, in get_data
  data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
  data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator

(它直接调用 read(),而不是yield from or await it)

我还尝试使用 io.TextIOWrapper 包装流:

stream = TextIOWrapper(r.content)

但这会导致以下结果:

File "echo_client.py", line 10, in get_data
  stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'

附:如果您想要这方面的示例测试用例,请查看这个要点 https://gist.github.com/ethanfrey/75e58db27095936b9e5e。您可以使用python3.5运行它来重现该错误。如果将块大小从 4 更改为 3(或 30),它将正常工作。

EDIT

接受的答案就像魅力一样解决了这个问题。谢谢!如果其他人有这个问题,这里是我制作的一个简单的包装类,用于处理 StreamReader 上的解码:

import codecs

class DecodingStreamReader:
    def __init__(self, stream, encoding='utf-8', errors='strict'):
        self.stream = stream
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)

    async def read(self, n=-1):
        data = await self.stream.read(n)
        if isinstance(data, (bytes, bytearray)):
            data = self.decoder.decode(data)
        return data

    def at_eof(self):
        return self.stream.at_eof() 

您可以使用增量解码器 https://docs.python.org/3/library/codecs.html#codecs.IncrementalDecoder:

Utf8Decoder = codecs.getincrementaldecoder('utf-8')

以你的例子:

decoder = Utf8Decoder(error='strict')
while not stream.at_eof():
    data = await stream.read(4)
    print(decoder.decode(data), end='')

Output:

Mädchen mit Biß
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 StreamReader 异步解码 utf-8 的相关文章

  • Python Popen 与 psexec 挂起 - 不良结果

    我对 subprocess Popen 和我认为是管道的问题有疑问 我有以下代码块 从 cli 运行时 100 都不会出现问题 p subprocess Popen psexec serverName get cmd c ver echo
  • 如何在 AWS CDK 创建的 Python Lambda 函数中安装外部模块?

    我在 Cloud9 中使用 Python AWS CDK 并且我部署简单的 Lambda 函数那应该是发送 API 请求到 Atlassian 的 API当对象上传到 S3 存储桶时 也是由 CDK 创建的 这是我的 CDK 堆栈代码 fr
  • Python - 将宽字符字符串从二进制文件转换为 Python unicode 字符串

    这是漫长的一天 我有点困惑 我正在读取一个包含大量宽字符字符串的二进制文件 我想将它们转储为 Python unicode 字符串 为了解压非字符串数据 我使用 struct 模块 但我不知道如何对字符串执行相同的操作 例如 阅读 系列 一
  • 如何正确地将 MIDI 刻度转换为毫秒?

    我正在尝试将 MIDI 刻度 增量时间转换为毫秒 并且已经找到了一些有用的资源 MIDI Delta 时间刻度到秒 http www lastrayofhope co uk 2009 12 23 midi delta time ticks
  • 在 HTTP 标头中发送 UTF-8 值会导致 Mojibake

    我想使用 servlet 发送阿拉伯语数据HTTPServletResponse给客户 我正在尝试这个 response setCharacterEncoding UTF 8 response setHeader Info arabicWo
  • 如何使用 imaplib 获取“消息 ID”

    我尝试获取一个在操作期间不会更改的唯一 ID 我觉得UID不好 所以我认为 Message ID 是正确的 但我不知道如何获取它 我只知道 imap fetch uid XXXX 有人有解决方案吗 来自 IMAP 文档本身 IMAP4消息号
  • 将数据帧行转换为字典

    我有像下面的示例数据这样的数据帧 我正在尝试将数据帧中的一行转换为类似于下面所需输出的字典 但是当我使用 to dict 时 我得到了索引和列值 有谁知道如何将行转换为像所需输出那样的字典 任何提示都非常感激 Sample data pri
  • Argparse nargs="+" 正在吃位置参数

    这是我的解析器配置的一小部分 parser add argument infile help The file to be imported type argparse FileType r default sys stdin parser
  • Pandas 中允许重复列

    我将一个大的 CSV 包含股票财务数据 文件分割成更小的块 CSV 文件的格式不同 像 Excel 数据透视表之类的东西 第一列的前几行包含一些标题 公司名称 ID 等在以下列中重复 因为一家公司有多个属性 而不是一家公司只有一栏 在前几行
  • 从零开始的 numpy 形状意味着什么

    好的 我发现数组的形状中可以包含 0 对于将 0 作为唯一维度的情况 这对我来说是有意义的 它是一个空数组 np zeros 0 但如果你有这样的情况 np zeros 0 100 让我很困惑 为什么这么定义呢 据我所知 这只是表达空数组的
  • Pandas 数据帧到 numpy 数组 [重复]

    这个问题在这里已经有答案了 我对 Python 很陌生 经验也很少 我已经设法通过复制 粘贴和替换我拥有的数据来使一些代码正常工作 但是我一直在寻找如何从数据框中选择数据 但无法理解这些示例并替换我自己的数据 总体目标 如果有人真的可以帮助
  • 从 python 发起 SSH 隧道时出现问题

    目标是在卫星服务器和集中式注册数据库之间建立 n 个 ssh 隧道 我已经在我的服务器之间设置了公钥身份验证 因此它们只需直接登录而无需密码提示 怎么办 我试过帕拉米科 它看起来不错 但仅仅建立一个基本的隧道就变得相当复杂 尽管代码示例将受
  • 将 2D NumPy 数组按元素相乘并求和

    我想知道是否有一种更快的方法 专用 NumPy 函数来执行 2D NumPy 数组的元素乘法 然后对所有元素求和 我目前使用np sum np multiply A B 其中 A B 是相同维度的 NumPy 数组m x n 您可以使用np
  • 使用 NumPy 将非均匀数据从文件读取到数组中

    假设我有一个如下所示的文本文件 33 346 1223 10 23 11 23 12 23 13 23 14 23 15 23 16 24 10 24 11 24 12 24 13 24 14 24 15 24 16 25 14 25 15
  • 使用yield 进行字典理解

    作为一个人为的例子 myset set a b c d mydict item yield join item s for item in myset and list mydict gives as cs bs ds a None b N
  • 如何在 OSX 上安装 numpy 和 scipy?

    我是 Mac 新手 请耐心等待 我现在使用的是雪豹 10 6 4 我想安装numpy和scipy 所以我从他们的官方网站下载了python2 6 numpy和scipy dmg文件 但是 我在导入 numpy 时遇到问题 Library F
  • 如何为每个屏幕添加自己的 .py 和 .kv 文件?

    我想为每个屏幕都有一个单独的 py 和 kv 文件 应通过 main py main kv 中的 ScreenManager 选择屏幕 设计应从文件 screen X kv 加载 类等应从文件 screen X py 加载 Screens
  • 限制 django 应用程序模型中的单个记录?

    我想使用模型来保存 django 应用程序的系统设置 因此 我想限制该模型 使其只能有一条记录 极限怎么办 尝试这个 class MyModel models Model onefield models CharField The fiel
  • 如何读取Python字节码?

    我很难理解 Python 的字节码及其dis module import dis def func x 1 dis dis func 上述代码在解释器中输入时会产生以下输出 0 LOAD CONST 1 1 3 STORE FAST 0 x
  • Scrapy Spider不存储状态(持久状态)

    您好 有一个基本的蜘蛛 可以运行以获取给定域上的所有链接 我想确保它保持其状态 以便它可以从离开的位置恢复 我已按照给定的网址进行操作http doc scrapy org en latest topics jobs html http d

随机推荐

  • Apache Beam 中的异步 API 调用

    正如标题所说 我想使用 python 在 apache beam 中进行异步 API 调用 目前 我正在为 Pcollection 中的每个元素调用 DoFn 内的 API 自由度代码 class textapi call beam DoF
  • Material UI 中的相同高度的卡片

    尝试使用 3 张水平卡片 但高度相同且响应灵敏 Like 卡A 卡 B 卡C 覆盖渲染组件
  • 切换 elseif 来切换 case

    我们如何将下面的 if else 语句切换为 switch case 语句 任何人都可以帮忙解决这个问题 if Webcc1 Contains licensePartID dtExpiryDate dtActivatedDate AddYe
  • 在 Android APK 中嵌入版本详细信息

    我的代码存储在SVN版本控制中 我使用 Eclipse 来构建我的 Android 应用程序 在我的应用程序中 我有一个关于框 我想在其中显示正确的源代码控制修订版 标签 任何内容 有没有一种方法可以自动执行此操作 以便我在 关于 框中的版
  • 如何在 Ruby 中拆分字符串并获取除第一个之外的所有项目?

    字符串是ex test1 test2 test3 test4 test5 当我使用 ex split first 它返回 test1 现在我想获取剩余的项目 即 test2 test3 test4 test5 如果我使用 ex split
  • 默认情况下,鼠标单击是否会将键盘焦点带到可聚焦控件上?

    这个问题看起来很奇怪 但根据我的经验 我已经习惯了只需用鼠标单击即可将键盘焦点设置到可聚焦元素 但是 UserControl 具有以下属性Focusable true and IsTabStop true让我感到惊讶的是 它通过 Tab 获
  • 如何监控 TensorFlow 估计器训练中的验证损失?

    我想问一个关于如何在 TensorFlow 估计器的训练过程中监控验证损失的问题 我查过类似的问题 估计器训练期间的验证 https stackoverflow com questions 45417502 validation durin
  • 如何在 MATLAB 中执行此累积和?

    我想计算第 2 列中的值的累积和dat txt下面是第 1 列中的每个字符串 所需的输出显示为dat2 txt dat txt dat2 txt 1 20 1 20 20 20 0 1 22 1 22 42 20 22 1 20 1 20
  • 解析树和抽象语法树(AST)有什么区别?

    它们是由编译过程的不同阶段生成的吗 或者它们只是同一事物的不同名称 这是基于表达评估器 http www antlr3 org works help tutorial calculator html泰伦斯 帕尔的语法 本例的语法 gramm
  • 修复了左侧的侧边栏菜单和顶部的固定标题

    所以我想做的是一个固定的侧边栏 顶部有一个固定的菜单 中间的内容可以滚动 body html height 100 margin 0 aside background 90EE90 height 100 left 0 position fi
  • validateRequest="false" 不起作用,即使 requestValidationMode="2.0"

    我有一个在 Visual Studio dev fabric azure 项目 中运行的 ASP NET 网站 并且正在使用 ACS 和 WIF 我的身份验证过程无法正常工作 因为登录后我收到以下信息 A potentially dange
  • Node.js 中的客户端-服务器通信

    我最近开始使用 Node js 来制作一个在线游戏 用于教育 通过阅读各种教程 我想出了如下所示的简单代码 使用我拥有的代码 我能够进行客户端 服务器通信 这基本上是我制作游戏所需的全部内容 只有一个问题 只有客户端可以发起会话 而服务器只
  • 在 gridview 中显示数据库中的数据

    有谁知道如何将数据库条目放入android中的gridview中 或者是否有一个教程解释了如何做到这一点 请通过提供完整的示例来帮助我 您将需要使用游标适配器 http developer android com reference and
  • 当片段更改时如何更新视图?

    我有一个活动 其中有 2 个 sherlockfragment 前两个页面显示带有自定义列表视图的片段 这些视图是使用 AsyncTask 从服务器的 xml 构建的 但是 当应用程序运行时 仅显示一个列表视图 另一页只是空白 public
  • new URL() - WHATWG URL API

    我正在摆弄节点 并试图获取 URL 类的实例 因为这些方便的属性 喜欢 const URL require url http createServer request response gt let uri new URL request
  • 如何生成具有给定限制和平均值的随机数? [复制]

    这个问题在这里已经有答案了 我想生成一系列 30 个随机数 其中上限为 40 下限为 12 平均值为 23 总结 n 30 最小值 12 最大值 40 平均值 23 提前致谢 如果您想要连续分布 您可以获得最小值和最大值之间的任何浮点值 一
  • 如何在 UML 中指定“一次一个”?

    我正在制作一个类图Classroom and a Course class 我怎样才能表明Classroom只能有一个Course一次在其中吗 我知道我可以使用多重性来指定教室可以只开设一门课程 但这并不能完全指定在不同时间可以有除该一门课
  • 列表中情侣的乘积之和

    我想找出列表中情侣的乘积之和 例如给出一个列表 1 2 3 4 我想要得到的是答案 1 2 1 3 1 4 2 3 2 4 3 4 我使用暴力来完成此操作 它会给我带来非常大的列表的超时错误 我想要一种有效的方法来做到这一点 请告诉我 我该
  • setFont(Times-Roman) 不能缺少 T1 文件吗?

    我有错误 Can t find pfb for face Times Roman Error reportlab graphics renderPM RenderPMError Can t setFont Times Roman missi
  • 使用 StreamReader 异步解码 utf-8

    我正在习惯 asyncio 并发现任务处理非常好 但将异步库与传统 io 库混合起来可能很困难 我当前面临的问题是如何正确解码异步 StreamReader 最简单的解决方案是read 字节字符串块 然后解码每个块 请参阅下面的代码 在我的