是否可以检测所有异步任务何时暂停?

2024-02-27

我正在尝试测试异步代码,但由于某些任务之间的复杂连接而遇到了麻烦。

我需要的上下文是一些代码,它与另一个进程写入文件的同时并行读取文件。代码中有一些逻辑,读取被截断的记录会使其后退并wait() on an asyncio.Condition稍后由inotify https://man7.org/linux/man-pages/man7/inotify.7.html事件。当另一个进程完成将来的写入时,此代码应该让它通过重新读取记录来恢复。我特别想测试一下这种恢复是否有效。

所以我的计划是:

  • 写入部分文件
  • 运行事件循环,直到它根据条件暂停
  • 写入文件的其余部分
  • 运行事件循环直至完成

我原以为这是答案:检测空闲异步事件循环 https://stackoverflow.com/questions/47369252/detect-an-idle-asyncio-event-loop

然而,试验表明它退出得太早了:

import asyncio
import random


def test_ping_pong():
    async def ping_pong(idx: int, oth_idx: int):
        for i in range(random.randint(100, 1000)):
            counters[idx] += 1
            async with conditions[oth_idx]:
                conditions[oth_idx].notify()
            async with conditions[idx]:
                await conditions[idx].wait()

    async def detect_iowait():
        loop = asyncio.get_event_loop()
        rsock, wsock = socket.socketpair()
        wsock.close()
        try:
            await loop.sock_recv(rsock, 1)
        finally:
            rsock.close()
    conditions = [asyncio.Condition(), asyncio.Condition()]
    counters = [0, 0]


    loop = asyncio.get_event_loop()
    loop.create_task(ping_pong(0, 1))
    loop.create_task(ping_pong(1, 0))
    loop.run_until_complete(loop.create_task(detect_iowait()))

    assert counters[0] > 10
    assert counters[1] > 10

在深入研究了 python 事件循环的源代码后,我发现没有任何公开的内容可以公开执行此操作。

然而,可以使用_ready双端队列由创建BaseEventLoop. See here https://github.com/python/cpython/blob/5efb1a77e75648012f8b52960c8637fc296a5c6d/Lib/asyncio/base_events.py#L390。这包含立即准备运行的每个任务。当任务运行时,它会从_ready双端队列。当挂起的任务被另一个任务释放时(例如通过调用future.set_result() https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.set_result) 挂起的任务立即添加回双端队列。这是从 python 3.5 开始就存在的。

您可以做的一件事是重复注入回调以检查中有多少项目_ready。当所有other任务被挂起,回调运行时 dqueue 中将没有任何内容。

事件循环的每次迭代最多运行一次回调:

async def wait_for_deadlock(empty_loop_threshold: int = 0):
    def check_for_deadlock():
        nonlocal empty_loop_count

        # pylint: disable=protected-access
        if loop._ready:
            empty_loop_count = 0
            loop.call_soon(check_for_deadlock)
        elif empty_loop_count < empty_loop_threshold:
            empty_loop_count += 1
            loop.call_soon(check_for_deadlock)
        else:
            future.set_result(None)

    empty_loop_count = 0
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    asyncio.get_running_loop().call_soon(check_for_deadlock)
    await future

在上面的代码中empty_loop_threshold在大多数情况下并不是真正必要的,但在任务与 IO 通信的情况下存在。例如,如果一个任务通过 IO 与另一个任务通信,则可能会有一个时刻所有任务都被挂起,即使其中一个任务已准备好读取数据。环境empty_loop_threshold = 1应该解决这个问题。

使用这个相对简单。你可以:

loop.run_until_complete(wait_for_deadlock())

或者按照我的问题中的要求:

def some_test():
    async def async_test():
        await wait_for_deadlock()
        inject_something()
        await wait_for_deadlock()
            

    loop = loop.get_event_loop()
    loop.create_task(task_to_test())
    loop.run_until_complete(loop.create_task(async_test)
    assert something
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以检测所有异步任务何时暂停? 的相关文章

  • 分配列表的多个值

    我很想知道是否有一种 Pythonic 方式将列表中的值分配给元素 为了更清楚 我要求这样的事情 myList 3 5 7 2 a b c d something myList So that a 3 b 5 c 7 d 2 我正在寻找比手
  • Python设置1和True的解释

    在 IPython 3 交互式 shell 中 In 53 set2 1 2 True hello In 54 len set2 Out 54 3 In 55 set2 Out 55 hello True 2 是因为 1 和 True 得到
  • 如何使用我自己的自定义表单覆盖 django-rest-auth 中的表单?

    我正在使用 django rest auth 并尝试通过覆盖表单的方法之一来修复密码重置视图中的错误 尽管我已经使用不同的 django rest auth 表单成功完成了类似的操作 但我无法让它在这个表单上工作 无论我做什么 都会使用旧的
  • 如何在seaborn热图标签中使用科学计数法?

    我正在尝试在 python 中使用seaborn 获取热图 不幸的是 即使数字非常大 它也没有使用科学记数法 我想知道是否有任何简单的方法可以转换为科学记数法或任何其他合理的格式 这是显示问题的一段代码 import seaborn as
  • Django 查询:“datetime + delta”作为表达式

    好吧 我的问题如下 假设我有下一个模型 这是一个简单的情况 class Period models Model name CharField field specs here start date DateTimeField field s
  • Python函数组成

    我尝试使用良好的语法来实现函数组合 这就是我所得到的 from functools import partial class compfunc partial def lshift self y f lambda args kwargs s
  • 在 PhotoImage 下调整图像大小

    我需要调整图像大小 但我想避免使用 PIL 因为我无法使其在 OS X 下工作 不要问我为什么 无论如何 因为我对 gif pgm ppm 感到满意 所以 PhotoImage 类对我来说没问题 photoImg PhotoImage fi
  • Python 中 time.sleep 和多线程的问题

    我对 python 中的 time sleep 函数有疑问 我正在运行一个脚本 需要等待另一个程序生成 txt 文件 虽然 这是一台非常旧的机器 所以当我休眠 python 脚本时 我遇到了其他程序不生成文件的问题 除了使用 time sl
  • 如何调试 numpy 掩码

    这个问题与this one https stackoverflow com q 73672739 11004423 我有一个正在尝试矢量化的函数 这是原来的函数 def aspect good angle float planet1 goo
  • 当我从本地计算机更改为虚拟主机时,从 python 脚本调用 pdftotext 不起作用

    我编写了一个小的 python 脚本来解析 提取 PDF 中的信息 我在本地机器上测试了它 我有 python 2 6 2 和 pdftotext 版本 0 12 4 我正在尝试在我的虚拟主机服务器 dreamhost 上运行它 它有 py
  • 如何在 numpy 数组中查找并保存重复的行?

    我有一个数组 例如 Array 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 1 1 1 2 2 2 我想要输出以下内容的东西 Repeated 1 1 1 2 2 2 保留重复行的数量也可以 例如 Repeated 1 1
  • 将多索引转换为行式多维 NumPy 数组。

    假设我有一个类似于以下示例的 MultiIndex DataFrame多索引文档 http pandas pydata org pandas docs stable advanced html gt gt gt df 0 1 2 3 fir
  • dask apply:AttributeError:“DataFrame”对象没有属性“name”

    我有一个参数数据框 并对每一行应用一个函数 该函数本质上是几个 sql queries 和对结果的简单计算 我正在尝试利用 Dask 的多处理 同时保持结构和界面 下面的例子有效并且确实有显着的提升 def get metrics row
  • 无法在我的程序中使用 matplotlib 函数

    我正在 Windows 10 中运行 Anaconda 安装 conda 版本 4 3 8 这是我尝试在 python 命令行中运行的代码 import matplotlib pyplot as plt x 1 2 3 4 y 5 6 7
  • 打印一份拥有多个家庭的人员名单,每个家庭都有多个电话号码

    我有一类 Person 它可以有多个 Home 每个 Home 都有一个或多个电话号码 我已经定义了类 但现在我正在尝试创建一个视图 其中列出每个人的所有家庭以及每个家庭地址的所有电话号码 类似于 john smith 123 fake s
  • 如何仅注释堆积条形图的一个类别

    我有一个数据框示例 如下所示 data Date 2021 07 18 2021 07 19 2021 07 20 2021 07 21 2021 07 22 2021 07 23 Invalid NaN 1 1 NaN NaN NaN N
  • 同一台机器上有多个Python版本?

    Python 网站上是否有关于如何在 Linux 上的同一台计算机上安装和运行多个版本的 Python 的官方文档 我可以找到无数的博客文章和答案 但我想知道是否有 标准 官方方法可以做到这一点 或者这一切都取决于操作系统 我认为它是完全独
  • 在 for 循环中访问 itertools 产品的元素

    我有一个列表列表 是附加 itertools 产品的一些其他结果的结果 我想要的是能够使用 for 循环访问列表列表中列表的每个元素 但我无法访问所有元素 我只能访问最后一个列表的元素 结果是一个非常巨大的列表列表 例如 1 2 4 3 6
  • python 日志记录替代方案 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 蟒蛇记录模块 http docs python org library logging html使用起来
  • 用 Beautiful Soup 进行抓取:为什么 get_text 方法不返回该元素的文本?

    最近我一直在用 python 开发一个项目 其中涉及抓取一些网站的一些代理 我遇到的问题是 当我尝试抓取某个知名代理站点时 当我要求 Beautiful Soup 查找 IP 在代理表中的位置时 它并没有按照我的预期执行操作 我将尝试查找每

随机推荐