如何从三个 ReceiveStream 中一次读取一行?

2024-04-20

异步有StreamReader.readline() https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamReader.readline,允许类似:

while True:
    line = await reader.readline()
    ...

(我没看到async for在 asyncio 中可用,但这将是明显的演变)

我怎样才能达到三重奏的同等效果?

我在 trio 0.9 中没有直接看到任何对此的高级支持。我所看到的只是ReceiveStream.receive_some() https://trio.readthedocs.io/en/latest/reference-io.html#trio.abc.ReceiveStream.receive_some它返回任意大小的二进制块;对我来说,解码并将其转换为逐行的东西似乎并不简单。是否有我可以使用的标准库函数或代码片段?我发现 io stdlib 模块看起来很有前途,但我没有看到任何提供“feed”方法的方法。


你是对的,目前 Trio 中没有对此提供高级支持。应该有某物,尽管我不是 100% 确定它应该是什么样子。我打开了an issue https://github.com/python-trio/trio/issues/796来讨论它。

同时,您的实现看起来很合理。

如果你想让它更加健壮,你可以(1)使用bytearray代替bytes对于你的缓冲区,要使追加和删除分摊为 O(n) 而不是 O(n^2),(2) 对最大行长度设置限制,因此邪恶的同行不能强迫你浪费无限内存缓冲无限长行,(3) 恢复每个调用find在最后一个停止的地方,而不是每次从头开始,再次避免 O(n^2) 行为。如果您只与合理的线路长度和行为良好的同行打交道,那么这些都不是超级重要的,但也没有坏处。

这是代码的调整版本,尝试合并这三个想法:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(您可以根据您喜欢的任何许可证随意使用。)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从三个 ReceiveStream 中一次读取一行? 的相关文章

  • python:查找围绕某个 GPS 位置的圆的 GPS 坐标的优雅方法

    我有一组以十进制表示的 GPS 坐标 并且我正在寻找一种方法来查找每个位置周围半径可变的圆中的坐标 这是一个例子 http green and energy com downloads test circle html我需要什么 这是一个圆
  • 如何手动计算分类交叉熵?

    当我手动计算二元交叉熵时 我应用 sigmoid 来获取概率 然后使用交叉熵公式并平均结果 logits tf constant 1 1 0 1 2 labels tf constant 0 0 1 1 1 probs tf nn sigm
  • 中断 Select 以添加另一个要在 Python 中监视的套接字

    我正在 Windows XP 应用程序中使用 TCP 实现点对点 IPC 我正在使用select and socketPython 2 6 6 中的模块 我有三个 TCP 线程 一个读取线程通常会阻塞select 一个通常等待事件的写入线程
  • 使用特定的类/函数预加载 Jupyter Notebook

    我想预加载一个笔记本 其中包含我在另一个文件中定义的特定类 函数 更具体地说 我想用 python 来做到这一点 比如加载一个配置文件 包含所有相关的类 函数 目前 我正在使用 python 生成笔记本并在服务器上自动启动它们 因为不同的
  • Python 中的哈希映射

    我想用Python实现HashMap 我想请求用户输入 根据他的输入 我从 HashMap 中检索一些信息 如果用户输入HashMap的某个键 我想检索相应的值 如何在 Python 中实现此功能 HashMap
  • Python 中的舍入浮点问题

    我遇到了 np round np around 的问题 它没有正确舍入 我无法包含代码 因为当我手动设置值 而不是使用我的数据 时 返回有效 但这是输出 In 177 a Out 177 0 0099999998 In 178 np rou
  • 删除flask中的一对一关系

    我目前正在使用 Flask 开发一个应用程序 并且在删除一对一关系中的项目时遇到了一个大问题 我的模型中有以下结构 class User db Model tablename user user id db Column db String
  • 将 python2.7 与 Emacs 24.3 和 python-mode.el 一起使用

    我是 Emacs 新手 我正在尝试设置我的 python 环境 到目前为止 我已经了解到在 python 缓冲区中使用 python mode el C c C c将当前缓冲区的内容加载到交互式 python shell 中 显然使用了什么
  • 独立滚动矩阵的行

    我有一个矩阵 准确地说 是 2d numpy ndarray A np array 4 0 0 1 2 3 0 0 5 我想滚动每一行A根据另一个数组中的滚动值独立地 r np array 2 0 1 也就是说 我想这样做 print np
  • Python 2:SMTPServerDisconnected:连接意外关闭

    我在用 Python 发送电子邮件时遇到一个小问题 me my email address you recipient s email address me email protected cdn cgi l email protectio
  • 如何使用 Mysql Python 连接器检索二进制数据?

    如果我在 MySQL 中创建一个包含二进制数据的简单表 CREATE TABLE foo bar binary 4 INSERT INTO foo bar VALUES UNHEX de12 然后尝试使用 MySQL Connector P
  • 如何使用python在一个文件中写入多行

    如果我知道要写多少行 我就知道如何将多行写入一个文件 但是 当我想写多行时 问题就出现了 但是 我不知道它们会是多少 我正在开发一个应用程序 它从网站上抓取并将结果的链接存储在文本文件中 但是 我们不知道它会回复多少行 我的代码现在如下 r
  • 如何通过 TLS 1.2 运行 django runserver

    我正在本地 Mac OS X 机器上测试 Stripe 订单 我正在实现这段代码 stripe api key settings STRIPE SECRET order stripe Order create currency usd em
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • 加快网络抓取速度

    我正在使用一个非常简单的网络抓取工具抓取 23770 个网页scrapy 我对 scrapy 甚至 python 都很陌生 但设法编写了一个可以完成这项工作的蜘蛛 然而 它确实很慢 爬行 23770 个页面大约需要 28 小时 我看过scr
  • import matplotlib.pyplot 给出 AttributeError: 'NoneType' 对象没有属性 'is_interactive'

    我尝试在 Pycharm 控制台中导入 matplotlib pyplt import matplotlib pyplot as plt 然后作为回报我得到 Traceback most recent call last File D Pr
  • 如何使用原始 SQL 查询实现搜索功能

    我正在创建一个由 CS50 的网络系列指导的应用程序 这要求我仅使用原始 SQL 查询而不是 ORM 我正在尝试创建一个搜索功能 用户可以在其中查找存储在数据库中的书籍列表 我希望他们能够查询 书籍 表中的 ISBN 标题 作者列 目前 它
  • 如何在 pygtk 中创建新信号

    我创建了一个 python 对象 但我想在它上面发送信号 我让它继承自 gobject GObject 但似乎没有任何方法可以在我的对象上创建新信号 您还可以在类定义中定义信号 class MyGObjectClass gobject GO
  • Django-tables2 列总计

    我正在尝试使用此总结列中的所有值文档 https github com bradleyayers django tables2 blob master docs pages column headers and footers rst 但页
  • 使用 z = f(x, y) 形式的 B 样条方法来拟合 z = f(x)

    作为一个潜在的解决方案这个问题 https stackoverflow com questions 76476327 how to avoid creating many binary switching variables in gekk

随机推荐

  • 23,148,855,308,184,500 是一个神奇的数字,还是纯粹的偶然?

    Locked 这个问题及其答案是locked help locked posts因为这个问题是题外话 但却具有历史意义 目前不接受新的答案或互动 新闻报道如this one http news bbc co uk 1 hi world am
  • PyTorch:如何检查训练期间某些权重是否没有改变?

    如何检查 PyTorch 训练期间某些权重是否未更改 据我了解 一种选择可以是在某些时期转储模型权重 并检查它们是否通过迭代权重进行更改 但也许有一些更简单的方法 有两种方法可以解决这个问题 First for name param in
  • 如何在 Recompose 中使用 withHandlers 将引用添加到功能组件并在 ScrollView 上调用 ScrollTo?

    我的具体目标是使用滚动到方法 http facebook github io react native docs scrollview html methodsScrollView 的但保持功能组件结构 更一般地说 这需要获取对当前组件的引
  • psql:符号查找错误:psql:未定义符号:PQsetErrorContextVisibility

    我将 postgres 版本从 9 2 24 切换到 9 6 因为我需要 jsonb 兼容性以及其他最新功能 我在 centos 7 上运行虚拟机 我决定擦除所有现有的 postgres 实例 因为它是临时的 所以几乎是空的 然后安装了 9
  • 签名者无效错误

    我为应用程序商店发行版构建 但是当尝试在设备上 而不是开发中 测试它时 它给出以下警告 应用程序 myapp 未安装在 iPhone 用户的 iPhone 上 因为签名者无效 任何解决这个问题的建议 此致 当您想要测试应用程序的生产版本 而
  • Unity 协程在后台停止

    我的问题如下 目前我在 Android iOS 游戏中运行了几个协程 但是当我将游戏发送到后台以便用手机尝试其他操作时 这些协程会停止 只有在我返回游戏后才恢复 有什么办法可以让协程在游戏在后台时继续运行吗 Android 将按设计暂停您的
  • 官方 Tensorflow 文档有 pdf 格式吗? (运行Windows)

    我无法找到 Tensorflow 的 pdf 格式文档 API 或教程 我指的是官方文档 而不是要求书籍推荐 具体为pdf格式 供离线本地参考和学习 一个问题是我运行的是 Windows 因此用于创建的可用工具集 它们动态地受到限制 或与通
  • 使用 gradlew 和 gradle 的区别

    使用有什么区别gradlew and gradle或者它们是相同的吗 不同之处在于 gradlew表明您正在使用 gradle 包装器 包装器通常是项目的一部分 它有助于 gradle 的安装 如果您在没有包装器的情况下使用 gradle
  • Request.UserHostAddress返回负载均衡器的IP地址

    我的网站中有一行关键代码可以在我们的开发环境中运行 但不能在生产环境中运行 好吧 我说它在开发中起作用 但事实是它给了 1 这是 IPv6 环回地址 无论如何 我想要做的是捕获访问该站点的用户的 IP 地址 因此 我使用Request Us
  • 以编程方式登录网站的技术

    我正在尝试自动登录 Photobucket 以供 API 使用 用于需要使用存储的凭据自动下载照片的项目 API 生成一个用于登录的 URL 并且使用 Firebug 我可以查看正在发送 接收的请求和响应 我的问题是 如何使用 HttpWe
  • 蓝牙连接:MODE_IN_CALL 与 MODE_IN_COMMUNICATION

    我有一个应用程序需要连接到无线蓝牙耳机来收集原始音频 MODE IN CALL 适用于某些设备 MODE IN COMMUNICATION 适用于其他设备 麦克风可以工作 但我失去了音频 反之亦然 我使用的是 Nexus 5x 和 Sams
  • Ajax.BeginForm 的 ASP .NET MVC 问题用 JSON 数据替换所有视图内容

    我在 VS 2015 中创建了一个示例 ASP NET MVC 网站 在视图中我使用扩展 Ajax BeginForm 将登录凭据发布到控制器 并且在 OnSuccess 回调上我想检查服务器错误 如果有则显示错误用户其他 重定向到主页 这
  • 当我向 /common/oauth2/v2.0/token 发出 Ajax 请求时出现 CORS 错误

    当我向以下对象发出 Ajax 请求时出现 CORS 错误https login microsoftonline com common oauth2 v2 0 token从我的申请中 下面是我正在使用的代码示例 var inputData g
  • 本地化 Android DatePickerDialog

    是否可以配置 android app DatePickerDialog 以便轻松本地化为欧洲格式 交换日期和月份并将英文按钮名称与本地化按钮名称交换 DatePickerDialog 默认情况下已本地化 我在真实设备上尝试过 对话框的本地化
  • 无法连接到 Subversion 本地存储库

    我已在 Windows 7 64 位上安装了 Subversion 1 8 8 和 TortoiseSVN 1 8 5 64 位 我使用 TortoiseSVN 在 C Projects SVNRepository 创建了一个本地存储库 在
  • 我可以从哪里开始使用可编程硬件?

    一段时间以来 我一直渴望至少学习一点有关硬件编程的知识 并想在这里提出要求以获得一些起点 我是一位相当有成就的程序员 具有 Delphi 和 Objective c 经验 但从未听过设备端口 中断 我什至不知道术语 更不用说对硬件进行编程了
  • 动态添加注释到现有类

    我有以下课程 public class Person 我想创建另一个类似这样的类 SomeAnnotation public class Person 通过像这样的简单方法 public static Class addAnnotation
  • IsPrimitive 不包含可为 null 的原始值

    我想检查类型是否是原始类型并使用以下代码 return type IsValueType type IsPrimitive 只要原始 int 可为空 这就可以正常工作 例如 int 如何检查该类型是否为可为空的原始类型 供参考 type I
  • 删除代码隐藏中的 asp.net 事件

    我想删除代码后面的事件 比如我的控件是这样的
  • 如何从三个 ReceiveStream 中一次读取一行?

    异步有StreamReader readline https docs python org 3 library asyncio stream html asyncio StreamReader readline 允许类似 while Tr