使用 asyncio 处理超时

2024-04-27

免责声明:这是我第一次尝试asyncio module.

我在用着asyncio.wait通过以下方式尝试支持等待一组异步任务的所有结果的超时功能。这是一个更大的库的一部分,因此我省略了一些不相关的代码。

请注意,该库已经支持提交任务并使用 ThreadPoolExecutors 和 ProcessPoolExecutors 使用超时,因此我对使用这些选项的建议或关于为什么我这样做的问题并不真正感兴趣asyncio。上代码...

import asyncio
from contextlib import suppress

... 

class AsyncIOSubmit(Node):
    def get_results(self, futures, timeout=None):
        loop = asyncio.get_event_loop()
        finished, unfinished = loop.run_until_complete(
            asyncio.wait(futures, timeout=timeout)
        )
        if timeout and unfinished:
            # Code options in question would go here...see below.
            raise asyncio.TimeoutError

起初我并不担心超时时取消待处理的任务,但后来我收到了警告Task was destroyed but it is pending!程序退出时或loop.close。经过一番研究后,我发现了多种取消任务并等待它们实际被取消的方法:

选项1:

[task.cancel() for task in unfinished]
for task in unfinished:
    with suppress(asyncio.CancelledError):
        loop.run_until_complete(task)

选项2:

[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))

选项 3:

# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
    await task

选项 4:

某种 while 循环,如this https://stackoverflow.com/a/40022171/10682164回答。似乎我的其他选择更好,但包括完整性。


到目前为止,选项 1 和 2 似乎都运行良好。任何一个选项都可能是“正确的”,但是asyncio经过多年的发展,网上的例子和建议要么已经过时,要么变化很大。所以我的问题是...

问题1

选项 1 和选项 2 之间有任何实际差异吗?我知道run_until_complete将运行直到未来完成,因此由于选项 1 按特定顺序循环,我想如果较早的任务需要更长的时间才能实际完成,它的行为可能会有所不同。我尝试查看 asyncio 源代码来了解是否asyncio.wait只是在幕后有效地对其任务/未来做了同样的事情,但这并不明显。

问题2

我假设如果其中一个任务正在进行长时间运行的阻塞操作,它实际上可能不会立即取消?也许这仅仅取决于正在使用的底层操作或库是否会立即引发 CancelledError ?也许为 asyncio 设计的库永远不应该发生这种情况?

由于我试图在这里实现超时功能,因此我对此有些敏感。如果可能的话,这些事情可能需要很长时间才能取消,我会考虑打电话cancel并且不等待它实际发生,或者设置一个非常短的超时来等待取消完成。

问题3

是否可以loop.run_until_complete(或者实际上,底层的调用async.wait) 返回值unfinished除了超时之外还有其他原因吗?如果是这样,我显然必须稍微调整一下我的逻辑,但从docs https://docs.python.org/3/library/asyncio-task.html#asyncio.wait看来那是不可能的。


选项 1 和选项 2 之间有任何实际差异吗?

不会。选项 2 看起来更好,而且效率可能稍微更高,但它们的净效果是相同的。

I know run_until_complete将运行直到未来完成,因此由于选项 1 按特定顺序循环,我想如果较早的任务需要更长的时间才能实际完成,它的行为可能会有所不同。

乍一看似乎是这样,但实际上并非如此,因为loop.run_until_complete runs all提交给循环的任务,而不仅仅是作为参数传递的任务。它仅仅stops一旦提供的等待完成 - 这就是“运行直到完成”所指的。循环调用run_until_complete已经调度的任务就像下面的异步代码:

ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
    await t

这在语义上又等同于以下线程代码:

ts = []
for i in range(1, 11):
    t = threading.Thread(target=time.sleep, args=(i,))
    t.start()
    ts.append(t)
# takes 10s, not 55s
for t in ts:
    t.join()

换句话说,await t and run_until_complete(t)阻塞直到t已完成,但允许其他一切 - 例如之前使用计划的任务asyncio.create_task()在那段时间也跑步。因此,总运行时间将等于最长任务的运行时间,而不是它们的总和。例如,如果第一个任务恰好需要很长时间,那么所有其他任务都会同时完成,并且他们的等待根本不会休眠。

所有这些仅适用于之前已安排的等待任务。如果您尝试将其应用于协程,它将不起作用:

# runs for 55s, as expected
for i in range(1, 11):
    await asyncio.sleep(i)

# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
    await t

# also 55s
for i in range(1, 11):
   t = threading.Thread(target=time.sleep, args=(i,))
   t.start()
   t.join()

对于 asyncio 初学者来说,这通常是一个症结所在,他们编写与最后一个 asyncio 示例等效的代码,并期望它能够并行运行。

我尝试查看 asyncio 源代码来了解是否asyncio.wait只是在幕后有效地对其任务/未来做了同样的事情,但这并不明显。

asyncio.wait只是一个方便的 API,它做两件事:

  • 将输入参数转换为实现的东西Future。对于协程来说,这意味着它将它们提交到事件循环,就像create_task,这使得它们能够独立运行。如果您像您一样一开始就给它任务,则会跳过此步骤。
  • uses add_done_callback当 future 完成时收到通知,此时它恢复其调用者。

所以是的,它做同样的事情,但采用不同的实现,因为它支持更多的功能。

我假设如果其中一个任务正在进行长时间运行的阻塞操作,它实际上可能不会立即取消?

在 asyncio 中不应该有“阻塞”操作,只有那些挂起的操作,并且应该立即取消它们。例外情况是附加到 asyncio 上的阻塞代码run_in_executor,其中底层操作根本不会取消,但 asyncio 协程将立即收到异常。

也许这仅仅取决于正在使用的底层操作或库是否会立即引发 CancelledError ?

图书馆没有raise CancelledError, it receives它在等待点,在取消发生之前它恰好暂停。对于图书馆来说,取消的影响是await ...中断其等待并立即引发CancelledError。除非被捕获,否则异常将通过函数传播并await一直调用顶级协程,其引发CancelledError将整个任务标记为已取消。行为良好的 asyncio 代码将做到这一点,可能使用finally释放它们所持有的操作系​​统级资源。什么时候CancelledError被捕获,代码可以选择不重新引发它,在这种情况下,取消实际上被忽略。

是否有可能loop.run_until_complete(或者实际上,底层调用async.wait) 由于超时以外的原因返回未完成的值?

如果您正在使用return_when=asyncio.ALL_COMPLETE(默认),这应该是不可能的。这是很有可能的return_when=FIRST_COMPLETED,那么显然可以独立于超时。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 asyncio 处理超时 的相关文章

  • 如何在Python的SciPy中更改稀疏矩阵中的元素?

    我构建了一个小代码 我想用它来解决涉及大型稀疏矩阵的特征值问题 它工作正常 我现在要做的就是将稀疏矩阵中的一些元素设置为零 即最顶行中的元素 对应于实现边界条件 我可以调整下面的列向量 C0 C1 和 C2 来实现这一点 不过我想知道是否有
  • Mac OS 上的诗歌安装失败,显示“should_use_symlinks”

    我正在尝试使用以下命令安装诗歌 curl sSL https install python poetry org python3 但它失败了 但有以下例外 例外 此版本的 python 无法在不使用符号链接的情况下创建 venvs 下面是详
  • 如何在 Windows 64 上安装 NumPy?

    NumPy 安装程序在注册表中找不到 python 路径 无法安装 需要 Python 2 5 版本 但在注册表中未找到该版本 OK 我必须修改注册表吗 我已经修改了 PATH 以指向Python25安装目录 我可以检查一下您使用的是什么安
  • opencv水印周围的轮廓

    我想在图像中的水印周围画一个框 我已经提取了水印并找到了轮廓 但是 不会在水印周围绘制轮廓 轮廓是在我的整个图像上绘制的 请帮我提供正确的代码 轮廓坐标的输出为 array 0 0 0 634 450 634 450 0 dtype int
  • 使用 python 中的公式函数使从 Excel 中提取的值的百分比相等

    import xlrd numpy excel Users Bob Desktop wb1 xlrd open workbook excel assignment3 xlsx sh1 wb1 sheet by index 0 colA co
  • 为什么删除临时文件时出现WindowsError?

    我创建了一个临时文件 向创建的文件添加了一些数据 已保存 然后尝试将其删除 但我越来越WindowsError 编辑后我已关闭该文件 如何检查哪个其他进程正在访问该文件 C Documents and Settings Administra
  • 删除 Django 1.7 中的应用程序(和关联的数据库表)

    是否可以使用 Django 1 7 迁移来完全删除 卸载应用程序及其所有跟踪 主要是其所有数据库表 如果没有 在 Django 1 7 中执行此操作的适当方法是什么 python manage py migrate
  • Python 中的流式传输管道

    我正在尝试使用 Python 将 vmstat 的输出转换为 CSV 文件 因此我使用类似的方法转换为 CSV 并将日期和时间添加为列 vmstat 5 python myscript py gt gt vmstat log 我遇到的问题是
  • 工作日重新订购 Pandas 系列

    使用 Pandas 我提取了一个 CSV 文件 然后创建了一系列数据来找出一周中哪几天崩溃最多 crashes by day bc DAY OF WEEK value counts 然后我将其绘制出来 但当然它按照与该系列相同的排名顺序绘制
  • 如果未引发异常,则通过 Python 单元测试

    在Python中unittest框架 是否有一种方法可以在未引发异常的情况下通过单元测试 否则会因 AssertRaise 而失败 如果我正确理解你的问题 你could做这样的事情 def test does not raise on va
  • Pandas:如果单元格包含特定文本则删除行

    pandas 中的这段代码不起作用 如果该列包含提供的任何文本 数字 我希望它删除该行 目前 我只能在单元格与我的代码中传递的确切文本匹配时才能使其工作 因为它只删除显示 Fin 的单元格不是金融或金融 df2 df df Team Fin
  • 搜索多个字段

    我想我没有正确理解 django haystack 我有一个包含多个字段的数据模型 我希望搜索其中两个字段 class UserProfile models Model user models ForeignKey User unique
  • 在 matplotlib 中的极坐标图上移动径向刻度标签

    From matplotlib 示例 http matplotlib org examples pylab examples polar demo html import numpy as np import seaborn as sbs
  • Django send_mail SMTPSenderRefused 530 与 gmail

    一段时间以来 我一直在尝试使用 Django 从我正在开发的网站接收电子邮件 现在 我还没有部署它 并且我正在使用Django开发服务器 我不知道这是否会影响它 这是我的 settings py 配置 EMAIL BACKEND djang
  • 使用Python计算目录的大小?

    在我重新发明这个特殊的轮子之前 有没有人有一个很好的例程来使用 Python 计算目录的大小 如果例程能够很好地以 Mb Gb 等格式格式化大小 那就太好了 这会遍历所有子目录 总结文件大小 import os def get size s
  • Matplotlib 中 x 轴标签的频率和旋转

    我在下面编写了一个简单的脚本来使用 matplotlib 生成图形 我想将 x tick 频率从每月增加到每周并轮换标签 我不知道从哪里开始 x 轴频率 我的旋转线产生错误 TypeError set xticks got an unexp
  • Anaconda 无法导入 ssl 但 Python 可以

    Anaconda 3 Jupyter笔记本无法导入ssl 但使用Atom终端导入ssl没有问题 我尝试在 Jupyter 笔记本中导入 ssl 但出现以下错误 C ProgramData Anaconda3 lib ssl py in
  • Elasticsearch 通过搜索返回拼音标记

    我用语音分析插件 https www elastic co guide en elasticsearch plugins current analysis phonetic html由于语音转换 从弹性搜索中进行一些字符串匹配 我的问题是
  • Django 与谷歌图表

    我试图让谷歌图表显示在我的页面上 但我不知道如何将值从 django 视图传递到 javascript 以便我可以绘制图表 姜戈代码 array Year Sales Expenses 2004 1000 400 2005 1170 460
  • 使用ssl和socket的python客户端身份验证

    我有一个 python 服务器 需要客户端使用证书进行身份验证 我如何制作一个客户端脚本 使用客户端证书由 python 中的服务器使用 ssl 和套接字模块进行身份验证 有没有仅使用套接字和 ssl 而不扭曲的示例 from OpenSS

随机推荐