并发.futures问题:为什么只有1个worker?

2024-04-28

我正在尝试使用concurrent.futures.ProcessPoolExecutor并行化串行任务。串行任务涉及从数字范围中查找给定数字的出现次数。我的代码如下所示。
在执行过程中,我从任务管理器/系统监视器/顶部注意到,尽管给定了 max_workers ,但只有一个 cpu/线程在持续运行processPoolExecutor值大于1。为什么会这样呢?如何使用并行化我的代码concurrent.futures?我的代码是用 python 3.5 执行的。

import concurrent.futures as cf
from time import time

def _findmatch(nmax, number):    
    print('def _findmatch(nmax, number):')
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end = time() - start
    print("found {} in {}sec".format(len(match),end))
    return match

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        future = executor.submit(_findmatch, nmax, number)
        futures = future.result()
        found = len(futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(found, end))
    return futures

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(len(a),end))

您的代码的问题在于,它仅提交一项任务,然后由其中一名工作人员执行该任务,而其余工作人员则不执行任何操作。您需要提交多个可以由工作人员并行执行的任务。

下面的示例将搜索区域分为三个不同的任务,每个任务由不同的工作人员执行。期货返还方式submit https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.submit被添加到列表中,一旦全部提交wait https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait用于等待它们全部完成。如果你打电话result https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.result提交任务后,它将立即阻塞,直到未来完成。

请注意,下面的代码不是生成数字列表,而是只计算其中包含数字 5 的数字,以减少内存使用量:

import concurrent.futures as cf
from time import time

def _findmatch(nmin, nmax, number):
    print('def _findmatch', nmin, nmax, number)
    start = time()
    count = 0
    for n in range(nmin, nmax):
        if number in str(n):
            count += 1
    end = time() - start
    print("found {} in {}sec".format(count,end))
    return count

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        chunk = nmax // workers
        futures = []

        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax

            futures.append(executor.submit(_findmatch, cstart, cstop, number))

        cf.wait(futures)
        res = sum(f.result() for f in futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(res, end))
    return res

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(a,end))

Output:

def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

并发.futures问题:为什么只有1个worker? 的相关文章

  • Python Numpy Reshape错误[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我在尝试重塑 3D numpy 数组时遇到一个奇怪的错误 数组 x 的形状为 6 10 300 我想将其重塑为 6 3000 我正
  • 绝对导入不起作用,但相对导入起作用

    这是我的应用程序结构 foodo setup py foodo init py foodo py models py foodo foodo foodo py从导入类models py module from foodo models im
  • 如何在动态执行的代码字符串中使用inspect.getsource?

    如果我在文件中有这段代码 import inspect def sample p1 print p1 return 1 print inspect getsource sample 当我运行脚本时 它按预期工作 在最后一行 源代码sampl
  • 如何调试 numpy 掩码

    这个问题与this one https stackoverflow com q 73672739 11004423 我有一个正在尝试矢量化的函数 这是原来的函数 def aspect good angle float planet1 goo
  • 如何在 numpy 数组中查找并保存重复的行?

    我有一个数组 例如 Array 1 1 1 2 2 2 3 3 3 4 4 4 5 5 5 1 1 1 2 2 2 我想要输出以下内容的东西 Repeated 1 1 1 2 2 2 保留重复行的数量也可以 例如 Repeated 1 1
  • 将多索引转换为行式多维 NumPy 数组。

    假设我有一个类似于以下示例的 MultiIndex DataFrame多索引文档 http pandas pydata org pandas docs stable advanced html gt gt gt df 0 1 2 3 fir
  • 烧瓶 - 404 未找到

    我是烧瓶开发的新手 这是我在烧瓶中的第一个程序 但它向我显示了这个错误 在服务器上找不到请求的 URL 如果您输入了网址 请手动检查拼写并重试 这是我的代码 from flask import Flask app Flask name ap
  • 样本()和r样本()有什么区别?

    当我从 PyTorch 中的发行版中采样时 两者sample and rsample似乎给出了类似的结果 import torch seaborn as sns x torch distributions Normal torch tens
  • dask apply:AttributeError:“DataFrame”对象没有属性“name”

    我有一个参数数据框 并对每一行应用一个函数 该函数本质上是几个 sql queries 和对结果的简单计算 我正在尝试利用 Dask 的多处理 同时保持结构和界面 下面的例子有效并且确实有显着的提升 def get metrics row
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • 如何让 Streamlit 每 5 秒重新加载一次?

    我必须每 5 秒重新加载 Streamlit 图表 以便在 XLSX 报告中可视化新数据 如何实现这一目标 import streamlit as st import pandas as pd import os mainDir os pa
  • 将 Python Selenium 输出写入 Excel

    我编写了一个脚本来从在线网站上抓取产品信息 目标是将这些信息写入 Excel 文件 由于我的Python知识有限 我只知道如何在Powershell中使用Out file导出 但结果是每个产品的信息都打印在不同的行上 我希望每种产品都有一条
  • Python 或 C 语言中的 Matlab / Octave bwdist()

    有谁知道 Matlab Octave bwdist 函数的 Python 替代品 此函数返回给定矩阵的每个单元格到最近的非零单元格的欧几里得距离 我看到了一个 Octave C 实现 一个纯 Matlab 实现 我想知道是否有人必须用 AN
  • 如何在 Python 中跟踪日志文件?

    我想在 Python 中提供 tail F 或类似内容的输出 而无需阻塞或锁定 我找到了一些非常旧的代码来做到这一点here http code activestate com recipes 436477 filetailpy 但我认为现
  • 更改用作函数全局作用域的字典

    我想做一个 purePython 的装饰器 其中一部分是能够有选择地禁止访问函数的全局范围 有没有一种方法可以以编程方式更改哪个字典事物充当函数的全局 外部作用域 因此 例如在下面我希望能够拦截对f in h并抛出错误 但我想允许访问g因为
  • 有没有办法拉伸整个显示图像以适应给定的分辨率?

    我最近一直在使用pygame制作游戏 遇到了一个小问题 基本上 我希望能够将屏幕上的整个图像 我已经传输到它的所有内容 拉伸到用户将窗口大小调整到的分辨率 我在 pygame 和堆栈溢出的文档中搜索了很多 但我似乎找不到答案 这可能吗 我的
  • 重定向 python 交互式帮助()

    我正在为使用 Qt 的应用程序开发交互式 python shell 但是我似乎无法获得重定向的交互式帮助 我的 python 代码中有这个 class OutputCatcher def init self self data def wr
  • 如何在supervisord中设置组?

    因此 我正在设置 Supervisord 并尝试控制多个进程 并且一切正常 现在我想设置一个组 以便我可以启动 停止不同的进程集 而不是全部或全无 这是我的配置文件的片段 group tapjoy programs tapjoy game1
  • 在 python 中使用高精度时间戳

    嘿 我正在使用 python 处理日期时间 我想知道解析这个时间戳的最佳方法是什么 时间戳是ISO标准 这里是一个例子 2010 06 19T08 17 14 078685237Z 现在到目前为止我已经使用过 time datetime d
  • 防止 Ada DLL 中的名称损坏

    有没有一种简单的方法可以防止在创建 Ada DLL 时 Ada 名称被破坏 这是我的 adb 代码 with Ada Text IO package body testDLL is procedure Print Call is begin

随机推荐