多处理 - 管道与队列

2023-11-27

队列和管道之间的根本区别是什么Python 的多处理包?

在什么情况下应该选择其中一种而不是另一种?什么时候使用比较有利Pipe()?什么时候使用比较有利Queue()?


简短的摘要

截至 CY2023,此答案中描述的技术已经过时。这些天,您可以使用pebble, mpire or concurrent.futures.ProcessPoolExecutor()...

无论您使用哪种 python 并发工具,OP 问题的答案仍然有效,如下所示。

ProcessPoolExector()不需要Pipe() or Queue()传达任务/结果。

原答案

  • A Pipe()只能有两个端点。

  • A Queue()可以有多个生产者和消费者。

何时使用它们

如果您需要两个以上的点进行通信,请使用Queue().

如果您需要绝对的性能,Pipe()速度更快,因为Queue()是建立在Pipe().

性能基准测试

假设您想要生成两个进程并尽快在它们之间发送消息。这些是使用类似测试进行拉力赛的计时结果Pipe() and Queue()...

仅供参考,我输入了结果SimpleQueue() and JoinableQueue()作为奖励。

  • JoinableQueue()考虑任务时queue.task_done()被调用(它甚至不知道具体的任务,它只是计算队列中未完成的任务),这样queue.join()知道工作已经完成。

这个答案底部的每个代码......

# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.9.2

$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.14316844940185547 seconds
Sending 100000 numbers to Pipe() took 1.3749017715454102 seconds
Sending 1000000 numbers to Pipe() took 14.252539157867432 seconds
$  python multi_queue.py
Sending 10000 numbers to Queue() took 0.17014789581298828 seconds
Sending 100000 numbers to Queue() took 1.7723784446716309 seconds
Sending 1000000 numbers to Queue() took 17.758610725402832 seconds
$ python multi_simplequeue.py
Sending 10000 numbers to SimpleQueue() took 0.14937686920166016 seconds
Sending 100000 numbers to SimpleQueue() took 1.5389132499694824 seconds
Sending 1000000 numbers to SimpleQueue() took 16.871352910995483 seconds
$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.15144729614257812 seconds
Sending 100000 numbers to JoinableQueue() took 1.567549228668213 seconds
Sending 1000000 numbers to JoinableQueue() took 16.237736225128174 seconds



# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.7.0

(py37_test) [mpenning@mudslide ~]$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.13469791412353516 seconds
Sending 100000 numbers to Pipe() took 1.5587594509124756 seconds
Sending 1000000 numbers to Pipe() took 14.467186689376831 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.1897726058959961 seconds
Sending 100000 numbers to Queue() took 1.7622203826904297 seconds
Sending 1000000 numbers to Queue() took 16.89015531539917 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.2238149642944336 seconds
Sending 100000 numbers to JoinableQueue() took 1.4744081497192383 seconds
Sending 1000000 numbers to JoinableQueue() took 15.264554023742676 seconds


# This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

总之:

  • 在Python 2.7下,Pipe()比a快约300%Queue()。甚至不要考虑JoinableQueue()除非你确实必须拥有这些好处。
  • 在 python 3.x 下,Pipe()仍然比其他国家有(大约 20%)优势Queue()s,但之间的性能差距Pipe() and Queue()不像 Python 2.7 中那样引人注目。各种种类Queue()各实施方案之间的差异大约在 15% 以内。我的测试也使用整数数据。有些人评论说,他们发现多处理所使用的数据类型存在性能差异。

python 3.x 的底线:YMMV...考虑使用您自己的数据类型(即整数/字符串/对象)运行您自己的测试,以得出有关您自己感兴趣的平台和用例的结论.

我还应该提到,我的 python3.x 性能测试不一致并且有所不同。我在几分钟内进行了多次测试,以获得每种情况的最佳结果。我怀疑这些差异与在 VMWare/虚拟化下运行我的 python3 测试有关;然而,虚拟化诊断只是猜测。

*** 响应关于测试技术的评论 ***

评论里@JJCsaid:

更公平的比较是运行 N 个工作线程,每个工作线程通过点对点管道与主线程通信,与运行 N 个工作线程全部从单个点对多点队列拉取的性能相比。

本来这个答案只考虑了一名工人和一名生产者的表现;这是基准用例Pipe()。您的评论需要为多个工作进程添加不同的测试。虽然这是对常见现象的有效观察Queue()用例,它可以轻松地沿全新的轴分解测试矩阵(即添加具有不同数量的工作进程的测试)。

奖励材料 2

多重处理引入了信息流的微妙变化,这使得调试变得困难,除非您知道一些快捷方式。例如,您可能有一个脚本,在许多条件下通过字典进行索引时都可以正常工作,但在处理某些输入时很少会失败。

通常当整个Python进程崩溃时我们就能得到失败的线索;但是,如果多处理函数崩溃,您不会将未经请求的崩溃回溯打印到控制台。如果不知道进程崩溃的原因,追踪未知的多处理崩溃是很困难的。

我发现跟踪多处理崩溃信息的最简单方法是将整个多处理函数包装在一个try / except并使用traceback.print_exc():

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

现在,当您发现崩溃时,您会看到类似以下内容:

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

源代码:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in range(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

"""
multi_simplequeue.py
"""

from multiprocessing import Process, SimpleQueue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = SimpleQueue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to SimpleQueue() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

多处理 - 管道与队列 的相关文章

  • matplotlib:调整图形窗口大小而不缩放图形内容

    当您调整图形大小时 Matplotlib 会自动缩放图形窗口中的所有内容 通常这是用户想要的 但我经常想增加窗口的大小 为其他东西腾出更多空间 在这种情况下 我希望在更改窗口大小时预先存在的内容保持相同的大小 有谁知道一个干净的方法来做到这
  • 测试交互式Python程序

    我想知道python的哪些测试工具支持交互式程序的测试 例如 我有一个由以下人员启动的应用程序 python dummy program py gt gt Hi whats your name Joseph 我想要仪器Joseph所以我可以
  • 成员初始值设定项列表和分配之间是否存在性能差异? [复制]

    这个问题在这里已经有答案了 我最近与一位朋友讨论 他们说在 C 中创建对象时使用初始化列表 而不是简单地分配数据成员 可以提高性能 为什么会这样 如果这是真的 我找到了这个页面 http www parashift com c faq in
  • python blpapi安装错误

    我试图根据 README 中的说明为 python 安装 blpapi 3 5 5 但是在运行时 python setup py install 我收到以下错误 running install running build running b
  • 如何使用 python http.server 运行 CGI“hello world”

    我使用的是 Windows 7 和 Python 3 4 3 我想在浏览器中运行这个简单的 helloworld py 文件 print Content Type text html print print print print h2 H
  • Scrapy Splash,如何处理onclick?

    我正在尝试抓取以下内容 我能够收到响应 但我不知道如何访问以下项目的内部数据以抓取它 我注意到访问这些项目实际上是由 JavaScript 和分页处理的 这种情况我该怎么办 下面是我的代码 import scrapy from scrapy
  • 如何检查 webgl(two.js) 的客户端性能

    我有一个使用 Three JS 的图形项目 现在我想自动检查客户端 GPU 性能并计算可以在应用程序中加载多少元素 我想到了诸如 GPU 基准测试之类的东西 看一眼stats js https github com mrdoob stats
  • 淘汰赛应用程序的性能调整 - 改进响应时间的指南

    我有一个大型 复杂的页面 严重依赖于 Knockout js 性能开始成为一个问题 但检查调用堆栈并试图找到瓶颈是一个真正的挑战 我在另一个问题中注意到 Knockout js 理解 foreach 和 with https stackov
  • 如何解决CDK CLI版本不匹配的问题

    我收到以下错误 此 CDK CLI 与您的应用程序使用的 CDK 库不兼容 请将CLI升级到最新版本 云程序集架构版本不匹配 支持的最大架构版本为 8 0 0 但发现为 9 0 0 发出后cdk diff命令 我确实跑了npm instal
  • TypeError:“NoneType”对象不可下标[重复]

    这个问题在这里已经有答案了 错误 names curfetchone 0 TypeError NoneType object is not subscriptable 我尝试检查缩进 但仍然有错误 我读到 如果数据库中没有文件名记录 变量名
  • 如何使用 Python 实现并行 gzip 压缩?

    使用python压缩大文件 https stackoverflow com questions 9518705 big file compression with python给出了一个很好的例子来说明如何使用例如bz2 纯粹用 Pytho
  • 使用 conda 安装额外功能

    With pip我们可以使用方括号安装子包 例如与阿帕奇气流 https pythonhosted org airflow installation html pip install airflow all 有类似的东西吗conda或者我必
  • 在 matplotlib 中将 3D 背景更改为黑色

    我在将 3D 图表的背景更改为黑色时遇到问题 这是我当前的代码 当我将facecolor设置为黑色时 它会将图表内部更改为灰色 这不是我想要的 fig plt figure fig set size inches 10 10 ax plt
  • 如何输入可变的默认参数

    Python 中处理可变默认参数的方法是将它们设置为无 https stackoverflow com a 366430 5049813 例如 def foo bar None bar if bar is None else bar ret
  • 尝试 Catch 性能 Java

    当捕获异常而不是进行检查时 try catch 需要多长时间 以纳秒为单位 假设消息具有用于查找的 HashMap 类型性能 try timestamp message getLongField MessageField TIMESTAMP
  • python:xml.etree.ElementTree,删除“命名空间”

    我喜欢 ElementTree 解析 xml 的方式 特别是 Xpath 功能 我有一个带有嵌套标签的应用程序的 xml 输出 我想按名称访问此标签而不指定名称空间 这可能吗 例如 root findall molpro job 代替 ro
  • “yield item”与 return iter(items) 相比有何优点?

    在下面的示例中 resp results 是一个迭代器 版本1 items for result in resp results item process result items append item return iter items
  • 如何通过 Selenium 内部的文本查找按钮(Python)?

    我有以下三个按钮 我不知道如何获取其中的文本 例如异常值 我试过browser find element by link text Outliers click 但出现 无法找到元素 错误 我该怎么做 See find element by
  • 如何对每一行进行 value_counts 并创建一些列,其值是每个值的计数

    我得到一个数据框如下 df c1 c2 c3 c4 c5 c6 c7 c8 c9 c10 c11 c12 r1 0 1 1 1 1 0 0 0 0 0 0 0 r2 1 2 2 2 2 1 1 1 1 0 0 0 r3 1 0 2 0 0
  • Pandas 2 个字段中唯一值的数量

    我正在尝试查找覆盖 2 个字段的唯一值的数量 例如 一个典型的例子是姓氏和名字 我有一个数据框 当我执行以下操作时 我只获取每列的唯一字段数 在本例中为 最后一个 和 第一个 不是复合体 df Last Name First Name nu

随机推荐