为什么 ProcessPoolExecutor 一直运行

2024-02-08

我尝试使用Python进程池执行器要计算一些 FFT 并行,请参见以下代码:

import concurrent.futures
import numpy as np
from scipy.fft import fft

def fuc(sig):
    C = fft(sig,axis=-1) 
    return C

def main()
    P, M, K = 20, 30, 1024
    FKP = np.array([P,M,K],dtype='cdouble')
    fkp = np.array([P,M,K],dtype='float32')
    fkp = np.random.rand(P,M,K)
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as ex:
        results = ex.map(fuc,(fkp[p,m].reshape(1,K) for p in range(P) for m in range(M)))
    FKP = list(results)

if __name__ == '__main__':
    main()

问题:

  1. 为什么内核一直很忙,但我在Windows任务管理器中没有看到4个worker?
  2. 我是否使用正确的方法在“FKP = list(results)”行中获得并行计算结果?

Q1 :
" why the kernel keeps busy, but I did not see 4 workers from windows task manager? "

A1 :
让我们在代码本身中解决这个问题:

import os
import time
...
def fuc( sig ):
    print( ( "INF[{0:}]: fuc() starts   "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    C = fft( sig, axis = -1 )
    print( ( "INF[{0:}]: fuc() FFT done "
           + "running in process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )
    return C

该代码将自行记录实际计算计划的 FFT 部分的时间、内容和时间。


Q2 :
" do I use the right way to get parallel calculated results in line "FKP = list(results)"? "

A2 :
是的,但是每个 SER/COMMS/DES 进程到进程边界的跨越都会产生一系列显着的附加开销成本,其中所有数据都进行 SER/DES 编码(pickle.dumps()-CPU/RAM 成本相同[TIME]- + [SPACE]-域 + 非零 ipc-p2p-传输时间 ) :

def Pinf():
    print( ( "NEW[{0:}]: ProcessPoolExecutor process-pool has "
           + "started process[{1:}]"
           + "-called-from-process[{2:}]"
             ).format( time.get_perf_ns(), os.getpid(), os.getppid() )
           )

def main():
    ...
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " entry point"
             ).format( time.get_perf_ns()
           )
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    with concurrent.futures.ProcessPoolExecutor( max_workers = 4,
                                                 initializer = Pinf
                                                 ) as ex:
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " is to start .map()"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        results = ex.map( fuc,
                          ( fkp[p,m].reshape( 1, K )
                            for p   in range( P )
                            for   m in range( M )
                            )
                          )
        ...
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        print( ( "INF[{0:}]: context-manager"
               + " .map() returned / __main__ has received all <_results_>"
                 ).format( time.get_perf_ns()
               )
        # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        pass
    # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    print( ( "INF[{0:}]: context-manager"
           + 30*"_" + " exited"
             ).format( time.get_perf_ns()
           )
    ...
    print( type( results ) )
    ...

有关每个进程池进程实例化的实际附加成本,请参阅报告的 ns-traces。详细信息是特定于平台的,如 { MacOS | Linux | Windows }-产生新进程的方法有很大不同。这同样适用于 Python 版本,因为较新的 Py3 版本在调用 Python 解释器进程复制方面做得很好,这与 Py2 和早期版本的 Py3.x 中常见的情况不同 - 有些复制调用 Python 的整个有状态副本-解释器,具有数据、文件描述符等的完整副本 - 由于所有关联的 RAM 分配用于存储调用 Python 解释器的 n 个副本,因此承受更大的进程实例化成本。

考虑到缩放比例:

>>> len( [ ( p, m ) for p in range( P ) for m in range( M ) ] )
600

效率很重要。仅将带有子范围索引的一个元组(p_start,p_end,m_start,m_end)传递给4个进程,其中应进行信号部分的FFT处理并返回其FFT结果的子列表,将避免传递相同的静态数据以小块形式多次传输,完全避免 596x 通过(CPU-RAM 和延迟方面)昂贵的 SER/COMMS/DES-SED/COMMS/DES ipc-p2p 数据传递通道。

欲了解更多详细信息,您可能想重新阅读this https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor and this https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么 ProcessPoolExecutor 一直运行 的相关文章

  • 内部错误:当前事务被中止,命令被忽略直到事务块结束

    使用多处理库在子进程中执行数据库调用时出现此错误 Visit Pastie http pastie org 811424 内部错误 当前事务被中止 命令被忽略直到 交易块结束 这是一个 Postgresql 数据库 使用psycopg2司机
  • 防止池进程导入 __main__ 和全局变量

    我正在使用工作人员的多处理池作为更大的应用程序的一部分 由于我用它来处理大量的简单数学 所以我有一个无共享的架构 其中工作人员需要的唯一变量作为参数传递 因此 我不需要工作子进程来导入任何全局变量 我的 main 模块 或者因此 它导入的任
  • MongoDB:在没有并行性的情况下使用 MapReduce 有什么意义?

    Quoting http www mongodb org display DOCS MapReduce MapReduce Parallelism http www mongodb org display DOCS MapReduce Ma
  • python 中的多线程:大多数时候它真的性能高效吗?

    据我所知 驱动编程的是性能因素multi threading在大多数情况下 但不是全部 无论 Java 还是 Python 我正在读这个启发性文章 https stackoverflow com questions 265687 why t
  • 在 Django 中使用多处理时,应用程序尚未加载,出现异常

    我正在做一个 Django 项目并尝试提高后端的计算速度 该任务类似于 CPU 限制的转换过程 这是我的环境 Python 3 6 1 姜戈 1 10 PostgreSQL 9 6 当我尝试通过 python 多处理库并行计算 API 时
  • OpenMP 动态调度与引导调度

    我正在研究 OpenMP 的调度 特别是不同的类型 我了解每种类型的一般行为 但澄清一下何时进行选择会很有帮助dynamic and guided调度 英特尔的文档 https software intel com en us articl
  • 多处理中的共享内存

    我有三个大清单 第一个包含位数组 模块位数组 0 8 0 另外两个包含整数数组 l1 bitarray 1 bitarray 2 bitarray n l2 array 1 array 2 array n l3 array 1 array
  • MPI Alltoallv 还是更好的单独发送和接收? (表现)

    我有许多进程 大约 100 到 1000 个 每个进程都必须将一些数据发送到其他一些进程 比如大约 10 个 通常 但并非总是必要 如果 A 发送到 B B 也会发送到 A 每个进程都知道它必须从哪个进程接收多少数据 所以我可以用MPI A
  • 使用请求和多处理时的奇怪问题

    请检查这个Python代码 usr bin env python import requests import multiprocessing from time import sleep time from requests import
  • 多处理与 gevent

    目前我正在使用带有发布 订阅模式的 Zeromq 我有一个要发布的工作人员和许多 8 个订阅者 所有人都会订阅 相同的模式 现在我尝试使用多处理来生成订阅者 它可以工作 我错过了一些消息 我使用多重处理的原因是在每条消息到达时对其进行处理
  • 如何在MPI中传递2D数组并使用C语言创建动态标签值?

    我是 MPI 编程新手 我有一个 8 x 10 数组 需要用它来并行查找每行的总和 在等级 0 进程 0 中 它将使用 2 维数组生成 8 x 10 矩阵 然后我会用tagnumber 作为数组的第一个索引值 行号 这样 我可以使用唯一的缓
  • ZeroMQ 在 python 多处理类/对象解决方案中挂起

    我正在尝试将 Python pyzmq 中的 ZeroMQ 与多处理一起使用 作为一个最小的 不是 工作示例 我有一个服务器类和一个客户端类 它们都继承自multiprocessing Process 客户端作为子进程应向服务器子进程发送消
  • 如何在多个进程之间共享字典?

    我想知道是否可以跨多个进程共享字典的内容 我一直在看http docs python org 2 library multiprocessing html shared ctypes objects http docs python org
  • Parallel.For 和 Break() 误解?

    我正在研究 For 循环中的并行性中断 看完之后this http tipsandtricks runicsoft com CSharp ParallelClass html and this http reedcopsey com 201
  • OpenMP:无法并行化嵌套 for 循环

    我想将循环与其中的内循环并行化 我的代码如下所示 pragma omp parallel for private jb ib shared n Nb lb lastBlock jj W WT schedule dynamic private
  • 使用 Python Multiprocessing Pool.map() 的问题在 Python 3.7.2 中变得棘手,但在 3.6.2 中很快完成

    我刚刚将Python从3 6 2 gt 3 7 2并且遇到了问题multiprocessing图书馆 我在 Django 应用程序中使用它 该应用程序在工作函数中使用 Django 特定的函数 见下文 在我的代码中 我有以下内容 impor
  • 超标量和 VLIW

    我想问一些关于ILP的问题 超标量处理器是标量处理器和矢量处理器的混合体 那么我可以说矢量处理器的架构遵循超标量吗 同时处理多个指令不会使体系结构超标量 因为流水线 多处理器或多核体系结构也可以实现这一点 这意味着什么 我读过 超标量 CP
  • 多处理:仅使用物理核心?

    我有一个函数foo它消耗大量内存 我想并行运行多个实例 假设我有一个有 4 个物理核心的 CPU 每个核心有两个逻辑核心 我的系统有足够的内存来容纳 4 个实例foo并行但不是 8 个 此外 由于这 8 个核心中的 4 个是逻辑核心 我也不
  • python future 和元组解包

    实现像使用 future 进行元组解包这样的事情的优雅 惯用的方法是什么 我有这样的代码 a b c f x y g a b z h y c 我想将其转换为使用期货 理想情况下我想写一些类似的东西 a b c ex submit f x y
  • 将整个包传递给雪簇

    我正在尝试并行化 使用snow parLapply 一些依赖于包 即除snow 调用函数中引用的对象parLapply必须使用显式传递给集群clusterExport 有没有办法将整个包传递到集群 而不必显式命名每个函数 包括用户函数调用的

随机推荐