Python 多处理管道非常慢(>100ms)

2024-02-29

我目前正在用 Python 3.x 编写一个图像处理程序,需要以低延迟(

目前,我正在使用管道向子进程发送命令,最重要的是在框架更新时通知它们。在测量父级的 send() 命令和子级的 receive() 命令之间的时间时,延迟始终 > 100 毫秒。我为此使用了 time.time_ns() 。

这是一个问题,因为输出源现在总是滞后 >100ms + 所有子进程完成处理所需的时间(另外 20-30ms + 所有 send() 函数之间的延迟)。

该应用程序旨在用于体育赛事直播,因此不会引入如此高的延迟。所以我有两个问题:

  1. Python 中的管道真的那么慢吗?或者是我的实施有问题。 (注:我已经在 Intel i5 第 9 代以及 Apple M1 上测试了延迟)

  2. 如果 Pipes 确实这么慢,我在 Python 中还有其他选择吗?除了诉诸某种形式的套接字之外?

Thanks.

Edit:

我在此处添加了用于测试管道延迟的代码。

import multiprocessing as mp
import time

def proc(child_conn):
    
    child_conn.recv()
    ts = time.time_ns()
    child_conn.send(ts)
    child_conn.close()

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    ts = time.time_ns()
    parent_conn.send("START")
    ts_end = parent_conn.recv()

    print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

只是为您编写了一种可能的解决方案,使用多重处理 https://docs.python.org/3/library/multiprocessing.html#process-and-exceptions物体Process https://docs.python.org/3/library/multiprocessing.html#process-and-exceptions and Queue https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.

我测量了它的吞吐速度,平均需要150 mcs(微秒)来处理一项几乎不执行任何操作的任务。处理只是从任务中获取整数,加 1 并将其发送回。我认为 150 微秒的延迟应该完全足以让你处理 30 FPS。

使用队列代替管道,因为我认为它更适合多任务处理。而且,如果您的时间测量很精确,那么队列也是660x比 Pipe 快几倍(150 微秒与 100 毫秒延迟相比)。

您可以注意到,处理循环批量发送任务,这意味着它首先向所有进程发送许多任务,然后才收集所有已发送和已处理的任务。与一次仅发送 1 个任务然后收集很少的结果相比,这种批处理使处理更加流畅。

如果您将任务发送到进程,然后在单独的轻量级线程中异步收集结果,那就更好了。这将防止您阻塞等待最慢的进程完成任务。

通过发送信号通知进程完成并退出None任务交给他们。

在线尝试一下! https://replit.com/@moytrage/StackOverflow70784547#main.py

def process(idx, in_q, out_q):
    while True:
        task = in_q.get()
        if task is None:
            break
        out_q.put({'n': task['n'] + 1})

def main():
    import multiprocessing, time

    queue_size = 1 << 16
    procs = []
    for i in range(multiprocessing.cpu_count()):
        in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
        procs.append({
            'in_q': in_q,
            'out_q': out_q,
            'proc': multiprocessing.Process(target = process,
                kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
        })
        procs[-1]['proc'].start()

    num_blocks = 1 << 2
    block = 1 << 10
    assert block <= queue_size

    tb = time.time()
    for k in range(num_blocks):
        # Send tasks
        for i in range(block):
            for j, proc in enumerate(procs):
                proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
        # Receive tasks results
        for i in range(block):
            for proc in procs:
                proc['out_q'].get()
    print('Processing speed:', round((time.time() - tb) /
        (num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
    
    # Send finish signals to processes
    for proc in procs:
        proc['in_q'].put(None)
    # Join processes (wait for exit)
    for proc in procs:
        proc['proc'].join()

if __name__ == '__main__':
    main()

Output:

Processing speed: 150.7 mcs per task

还测量了一次仅向所有进程发送 1 个任务(而不是一次 1000 个任务)以及一次接收 1 个任务的时间。在这种情况下,延迟是460 mcs(微秒)。因此,您可以认为,在使用队列的最坏情况下,队列的纯延迟为 460 mcs(460 mcs 包括发送和接收)。


我已经采用了您的示例片段并对其进行了一些修改以使用队列而不是管道,并且得到0.1 ms delay.

请注意,我在循环中执行此操作 5 次,因为第一次或第二次尝试初始化了一些与队列相关的内容。

在线尝试一下! https://replit.com/@moytrage/StackOverflow70784547var2#main.py

import multiprocessing as mp
import time

def proc(inp_q, out_q):
    for i in range(5):
        e = inp_q.get()
        ts = float(time.time_ns())
        out_q.put(ts)

if __name__ == "__main__":

    inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
    p1 = mp.Process(target=proc, args=(inp_q, out_q))
    p1.start()

    for i in range(5):
        ts = float(time.time_ns())
        inp_q.put("START")
        ts_end = out_q.get()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
    p1.join()

Output:

Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032

此外,在循环中运行示例多次会使第二次和其他发送/接收迭代比第一次快得多。

由于延迟初始化资源,第一次非常慢。大多数算法是延迟初始化 https://en.wikipedia.org/wiki/Lazy_initialization,这意味着它们仅在第一次调用时分配所有需要的资源。这是为了防止根本不使用算法时不必要的分配。另一方面,这使得首次调用变得更慢,因此您必须执行几次首次空调用来预热惰性算法。

在线尝试一下! https://tio.run/##hVDNSsQwEL7nKYaekmWtLqJIoQffQLT3ENq0O7iZhmQURHz2mnajm5POIYSZ72e@8R98nOn2wYdlQefnwODeTow@zL2NEWkCE8F5kYeMzgox2BFWhOyPeBp0PxOpRkCqcQ6AgATB0GTlXW6vdcHWwfbvUv1OOEK7KdfroykWs4IVLQ2SoxICR9CaTMJqaFuotHYGSeuqERvRm2CJN9a@UEguztdP6G028IfcOqeVbMJkuV2j7SH9Y1sk3KsfUh0TkJPGf5H/ClbseE5WvXSPz11VnkWnflIoofl0F5mAxHKsumQAbF4trau42MCnzAJXSUldy8PNbnevvpLBsnwD

import multiprocessing as mp
import time

def proc(child_conn):
    for i in range(5):
        child_conn.recv()
        ts = time.time_ns()
        child_conn.send(ts)

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    for i in range(5):
        ts = time.time_ns()
        parent_conn.send("START")
        ts_end = parent_conn.recv()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

Output:

Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python 多处理管道非常慢(>100ms) 的相关文章

  • 使用 matplotlib 在图像数据之上对线网格进行像素精确定位

    我试图在 python 库 matplotlib 显示的图像网格顶部精确地覆盖 1 像素宽线的网格 不幸的是 我似乎无法对结果进行足够精细的控制 以实现线网格与数据网格的正确对齐 如下面的代码所示 结果似乎总是很接近 但并不完全正确 我尝试
  • 使用 Flask 从 Jinja 模板中的 settings.py 文件获取变量

    假设我有 settings py 文件 其中包含一堆常量 将来可能会更多 如何访问 Jinja 模板中的这些变量 Flask 会自动将您的应用程序的配置包含在标准上下文 http flask pocoo org docs templatin
  • 在Python中用空格分割字符串——保留带引号的子字符串

    我有一个像这样的字符串 this is a test 我正在尝试用 Python 编写一些内容 以将其按空格分开 同时忽略引号内的空格 我正在寻找的结果是 this is a test 附言 我知道您会问 如果引号内有引号会发生什么 嗯 在
  • Pandas:根据其他多级列对最里面的列进行分组排序

    考虑下面的 df In 3771 df pd DataFrame A a 11 B b 11 C C1 C1 C2 C1 C3 C3 C2 C3 C3 C2 C2 D D1 D2 D1 D3 D3 D2 D4 D4 D1 D2 D3 E v
  • PySerial 和多个 Python 安装出现问题

    我的 Windows 7 计算机上有 Python 2 4 4 和 3 1 3 我想使用 PySerial 听说是内置的 所以我尝试了一下import serial在两个版本中 两者都造成了Import Error 然后我从以下位置下载了w
  • Pandas Dataframe.to_csv 小数=',' 不起作用

    在 Python 中 我正在将 Pandas Dataframe 写入 csv 文件 并希望将小数分隔符更改为逗号 像这样 results to csv D Data Kaeashi BigData ProcessMining Voorbe
  • Python - 在先前已在全局范围内查找的函数内重新分配名称

    为什么我在下面的第三个代码中出现错误 但在前两个代码中却没有出现错误 我使用的是 Python 3 6 0 Anaconda 4 3 1 64 位 Jupyter Code 1 c 100 def fib c 20 a c print a
  • 在OpenCV Python中编写4通道以上图像

    这对我来说是一个持续的挑战 我正在尝试使用 openCV 将两个 3 RGB 图像组合成一个 6 通道 TIFF 图像 到目前为止我的代码如下 import cv2 import numpy as np im1 cv2 imread im1
  • 如何计算具有较大中间值的总和

    我想计算 for n m两个值都是 1000 以内的整数 最终结果是一个不大于 1000 的数字n但中间值对于 python 来说太大了 无法处理 你怎么解决这个问题 我将函数定义如下 from scipy misc import comb
  • 如何让MagicMock返回多个值

    我想模拟一个图书馆 matplotlib对于它的价值 并且遇到一个问题 当调用模拟并期望返回元组时 它会失败 有一个更好的方法吗 Python 3 7 2 default Jan 13 2019 12 50 15 Clang 10 0 0
  • 如何在 python 中连接到 GObject 信号,而不保留对连接器的引用?

    问题基本上是这样的 在 python 的 gobject 和 gtk 绑定中 假设我们有一个在构造时绑定到信号的类 class ClipboardMonitor object def init self clip gtk clipboard
  • Altair 条形图具有可变宽度的条形?

    我正在尝试在 Python 中使用 Altair 制作条形图 其中条形的宽度根据源数据帧列中的数据而变化 最终目标是获得如下所示的图表 条形的高度对应于每种能源技术的边际成本 在源数据框中以列形式给出 条形宽度对应于每种能源技术的容量 也以
  • 如何使用 Python Flask-Security 使用 bcrypt 加密密码?

    我正在尝试使用 Flask Security 文档中的标准基本示例 并使其正常工作 除了密码以明文形式存储之外 我知道这一行 user datastore create user email email protected cdn cgi
  • 仅打印字符串中的元音

    我是Python新手 我正在尝试打印字符串中的所有元音 因此 如果有人输入 嘿 一切都好吗 所有元音都需要打印 但我不知道怎么做 所以这不是计算元音 而是打印元音 现在我已经得到了这个 sentence input Enter your s
  • Anaconda (Python) - Windows 10 上的 Cmder 集成

    我在 Windows 10 64 位上通过 Anaconda 让 Cmder 使用 Python 时遇到了一些麻烦 我让 Anaconda 工作得很好 测试过用 matplotlib 绘制一些东西 它与 Anaconda Prompt 一起
  • 使用 pyinstaller 制作的可执行文件出现运行时错误

    所以我使用 Pygame 制作了一个游戏 现在我想用它制作一个可执行文件 首选独立可执行文件 所以我用它来制作可执行文件 pyinstaller onefile main py 编译顺利 但运行时出现错误 这是错误 Traceback mo
  • python 中的优化标准化

    在优化过程中 对输入参数进行归一化 使它们处于同一数量级 通常会很有帮助 这样收敛效果会更好 例如 如果我们想要最小化 f x 而合理的近似值是 x0 1e3 1e 4 则将 x0 0 和 x0 1 归一化到大约相同的数量级可能会有所帮助
  • 是否有比 .apply() 更慢或更受控制的替代方案?

    所以这似乎是一个奇怪的问题 但我有一只熊猫DataFrame其中包含地址 我想对其进行地理编码 以便获得纬度和经度 我有可以使用的代码 apply 感谢这个非常有帮助的线程 使用 geopy pandas 的新列坐标 https stack
  • 通过 Tweepy 在 Twitter 上更新状态时的回溯

    我一直在尝试使用 Twitter 在 Twitter 上发布我的 Rpi 读数tweepy 但首先我想检查一下是否tweepy本来可以正常工作 但事实并非如此 我正确安装了软件包 但是当我尝试运行简单的代码来发布某些内容时 出现错误 是的
  • vtkPythonAlgorithm 控制管道执行

    我正在尝试用 python 编写一个 vtk 过滤器ProjectDepthImage进行投影不是问题 它控制 vtk 管道的执行 基本上 我对 UserEvent 有一个回调 当用户在渲染窗口处于活动状态时按下 u 键时会触发该回调 这将

随机推荐

  • 在没有 #include wchar 的 GCC C 中,前缀 L"..." 代表什么?

    也就是说 为什么unsigned short var L 工作 但是unsigned short var L 才不是 L 属于类型wchar t 它可以隐式转换为unsigned short L 属于类型wchar t 2 不能隐式转换为u
  • 如何使用OpenID或OAuth进行内部第一方认证?

    我正在为一组 RESTful Web 应用程序的用户开发一个内部身份验证系统 我们的目的是 用户应该能够通过 Web 表单登录一次 并能够适当访问我们域中的所有这些 RESTful 应用程序 这些应用程序可能分布在跨许多服务器的私有云中 我
  • 接下来Js Router.push不是函数错误

    当我尝试使用 Router push 进行重定向时 出现以下错误 TypeError next router WEBPACK IMPORTED MODULE 3 Router push is not a function 我正在尝试从 cr
  • 记录 ASP Web API 操作的持续时间

    我正在使用 ActionFilter 来记录 ASP NET Web API 项目的所有操作调用 OnActionExecuted 方法讲述了很多有关正在发生的事情的信息 我只是不知道如何找到一种有效的方法来测量执行时间 像这样的事情应该可
  • pandoc 在转换为 pdf 时不会对代码块进行文本换行

    我正在使用 pandoc 和 xelatex 引擎将 markdown 转换为 pdf 我像这样运行 pandoc pandoc s backbone fundamentals md o backbone fundamentals pdf
  • SpecFlow - 重试失败的测试

    有没有办法实现AfterScenario在失败的情况下重新运行当前测试的钩子 像这样的东西 AfterScenario retry public void Retry if ScenarioContext Current TestError
  • 当 C 表达式中发生整数溢出时会发生什么?

    我有以下 C 代码 uint8 t firstValue 111 uint8 t secondValue 145 uint16 t temp firstValue secondValue if temp gt 0xFF return tru
  • 在 drools-camel-server 上重新加载远程 drools guvnor 资源

    我正在使用 drools camel server 5 4 Final 来执行从 jboss AS7 上的 guvnor 获取的规则 如下所示
  • 如何清除字符串流变量?

    我已经尝试过几件事了 std stringstream m m empty m clear 两者都不起作用 对于所有标准库类型的成员函数empty 是一个查询 而不是一个命令 即它的意思是 你是空的吗 而不是 请扔掉你的东西 The cle
  • 列名作为 PL/SQL ORACLE 中的变量

    我想要一个代码 其中我将列名声明为变量 然后使用此变量从某个表中检索所需的列 DECLARE col n VARCHAR 100 X BEGIN select col n from my table END Oracle 中最简单 最明确的
  • 确定 Ravenscar 程序中堆栈使用情况的最佳实践

    我正在使用 Ravenscar 子集编写一个 Ada 程序 因此 我知道执行时正在运行的任务数量 该代码是由 gcc 编译的 fstack check https gcc gnu org onlinedocs gnat ugn Stack
  • 使用 SQL Server 2012 生成包含一天中各小时的日历表

    问题陈述 我在消防部门工作 正在对我的数据进行统计分析 一个问题是生成一个日历年中每天每小时的服务呼叫数量 我需要一张可以连接到一年中每一天和每天每小时的火灾事件的表 我希望的是以下内容 使用军事时间 2017 年 1 月 1 日 00 0
  • 如何隐藏任务栏条目但保留窗口窗体?

    我想隐藏任务栏条目以最大化有效空间 因为该应用程序有一个系统托盘图标 我不需要任务栏条目 该应用程序不允许您只有一个系统托盘而不是两者都有 如何隐藏任务栏条目但保留窗口窗体 您的申请是用什么语言编写的 你想要的API调用被称为设置窗口长度
  • 删除 UIPopoverPresentationController 后面的调光视图

    我正在尝试删除使用 UIPopoverPresentationController 呈现的弹出窗口后面的变暗视图 我已经实现了自定义 UIPopoverBackgroundView 但似乎没有办法摆脱这种变暗视图 我已经使用视图层次结构检查
  • initializer_list c++11 中的求值顺序

    在下面的代码中是否需要f1之前被调用f2 或反之亦然 还是未指定 int f1 int f2 std initializer list
  • 在 Python 列表推导式中缓存值

    我正在使用以下列表理解 resources obj get file for obj in iterator if obj get file None 有没有办法 缓存 obj get file 当它在 if 语句中检查时 这样就不必调用g
  • 使用 Google OAuth 2.0 的 Nginx 代理

    我有一个 Ubuntu 14 04 服务器 并且有一个运行在以下位置的流星应用程序localhost 3000在此服务器上 我的服务器的公共 FQDN 是sub example com Meteor 应用程序使用 Google OAuth
  • 窗口调整大小指令

    我试图在窗口调整大小时调整 div 大小 环顾四周后 似乎使用指令是最好的解决方案 模板 div div 指示 myApp directive elheightresize window function window return lin
  • 如何编辑 WKWebView 显示的键盘附件视图?

    我在 Swift 应用程序中使用 WKWebView 来呈现一些文本字段 我设置了一些外观属性来匹配特定的设计 在这种情况下 其背景必须是蓝色的 但是当 WKWebView 触发键盘时 它会对外观属性执行一些操作 并以我的颜色的浅色外观显示
  • Python 多处理管道非常慢(>100ms)

    我目前正在用 Python 3 x 编写一个图像处理程序 需要以低延迟 目前 我正在使用管道向子进程发送命令 最重要的是在框架更新时通知它们 在测量父级的 send 命令和子级的 receive 命令之间的时间时 延迟始终 gt 100 毫