具有更新队列和输出队列的 Python 多处理

2024-01-31

如何编写使用两个队列的 Python 多进程脚本?:

  1. 一个作为工作队列,以一些数据开始,并且根据要并行化的函数的条件,动态接收更多任务,
  2. 另一个收集结果并用于在处理完成后写下结果。

我基本上需要根据我在初始项目中发现的内容在工作队列中放入更多任务。我在下面发布的示例很愚蠢(我可以根据需要转换该项目并将其直接放入输出队列中),但其机制很清晰并且反映了我需要开发的部分概念。

特此我的尝试:

import multiprocessing as mp

def worker(working_queue, output_queue):
    item = working_queue.get() #I take an item from the working queue
    if item % 2 == 0:
        output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
    else:
        working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__':
    static_input = range(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    for result in iter(output_q.get, None):
        print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.

这不会结束也不会打印任何结果。

在整个过程结束时,我想确保工作队列为空,并且所有并行函数都已完成对输出队列的写入,然后再迭代后者以取出结果。您对如何使其发挥作用有什么建议吗?


下面的代码达到了预期的效果。它遵循@tawmas 提出的建议。

此代码允许在一个进程中使用多个内核,该进程要求向工作线程提供数据的队列可以在处理过程中由它们更新:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() == True:
            break #this is the so-called 'poison pill'    
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return

if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    results_bank = []
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() == True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

具有更新队列和输出队列的 Python 多处理 的相关文章

  • 使用请求验证 SSL 证书

    我正在尝试验证 SSL 但它不起作用 我在浏览器上访问了我想要访问的机密网站 在 Chrome 上 我单击了储物柜 gt 证书 gt 详细信息 gt 复制到文件 gt base64 gt cert cer 我的代码是 test reques
  • 如何使用 cython 编译扩展?

    我正在尝试从示例页面编译一个简单的 cython 扩展here http docs cython org src userguide tutorial html在我安装了 Python 2 6 64 位版本的 Windows 7 64 位计
  • 为什么 .setGeometry() 不改变 QWidget 实例的大小?

    我想使用 QWidget 更改 QPushButton 的大小 setGeometry https doc qt io qtforpython 5 PySide2 QtWidgets QWidget html PySide2 QtWidge
  • 一次将Python dict的内容分配给多个变量?

    我想做这样的事情 def f return a 1 b 2 c 3 a b f or a b f IE 这样 a 被分配为 1 b 被分配为 2 并且 c 是未定义的 这与此类似 def f return 1 2 a b f 依赖于变量名称
  • Tensorflow 可变图像输入大小(自动编码器、放大......)

    Edit WARNING不建议使用不同图像大小的图像 因为张量需要具有相同的大小才能实现并行化 我一直在寻找解决方案 了解如何使用不同大小的图像作为神经网络的输入 Numpy 第一个想法是使用numpy 然而 由于每个图像的大小不同 我无法
  • 带图像的简单 GUI [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我试图在简单的 GUI 上显示一些卡
  • Python igraph:从图中删除顶点

    我正在使用安然电子邮件数据集 并尝试删除没有 enron com 的电子邮件地址 即我只想拥有安然电子邮件 当我尝试删除那些没有 enron com 的地址时 一些电子邮件由于某些原因被跳过 下面显示了一个小图 其中顶点是电子邮件地址 这是
  • 定义函数后对其进行修饰?

    I think答案是否定的 但我似乎找不到明确的说法 我有以下情况 def decorated function function functools wraps function def my function print Hello s
  • matplotlib matshow 标签

    我一个月前开始使用 matplotlib 所以我仍在学习 我正在尝试用 matshow 制作热图 我的代码如下 data numpy array a reshape 4 4 cax ax matshow data interpolation
  • 具有多个元素的数组的真值是二义性错误吗? Python

    from numpy import from pylab import from math import def TentMap a x if x gt 0 and x lt 0 5 return 2 a x elif x gt 0 5 a
  • Python:计算数据帧列中所有行中特定字符的实例数

    我有一个包含列 toaddress ccaddress body 的数据框 df 我想迭代数据帧的索引 以获取 toaddress 和 ccaddress 字段中电子邮件地址的最小 最大和平均数量 这是通过计算这两列中每个字段中的 和 的实
  • Python 中的 @staticmethod 与 @classmethod

    方法和方法有什么区别装饰的 https peps python org pep 0318 with staticmethod http docs python org library functions html staticmethod和
  • conda-env list / conda info --envs 如何查找环境?

    我一直在尝试 anaconda miniconda 因为我的用户使用随 miniconda 安装的结构生物学程序 并且作者都没有 A 考虑到可能存在其他 miniconda 应用程序 B 他们的程序将在多用户环境中使用 因此 使用 Arch
  • Matplotlib Scatter - ValueError:RGBA 序列的长度应为 3 或 4

    我正在尝试为我的功能绘制图表 但不断收到此错误 ValueError RGBA sequence should have length 3 or 4 每当我只有 6 种形状时 代码就可以完美运行 但现在我将其增加到 10 种 它就不起作用了
  • 查找给定节点的最高权重边

    我在 NetworkX 中有一个有向图 边缘的权重从 0 到 1 表示它们发生的概率 网络连通性非常高 所以我想修剪每个节点的边缘 只保留最高概率的节点 我不确定如何迭代每个节点并仅保留最高权重in edges在图中 有没有一个networ
  • Flask WTForms 使用变量自动填充 StringField

    我有一个表格 我想用上一页收到的信息自动填充一些字段 但如果他们想调整它 它需要是可更改的 我正在为我的 SelectField 使用动态创建的列表 但添加 StringField 并不成功 请参阅下面的我的代码 forms py clas
  • 为什么实现 __iter__ 的对象不被识别为可迭代的?

    假设您使用包装对象 class IterOrNotIter def init self self f open tmp toto txt def getattr self item try return self getattribute
  • Python - 如何查询定义方法的类?

    我的问题有点类似于this one https stackoverflow com questions 5520580 how do you get all classes defined in a module but not impor
  • Python 3.2 中 **kwargs 和 dict 有什么区别?

    看起来Python的很多方面都只是功能的重复 除了我在 Python 中的 kwargs 和 dict 中看到的冗余之外 还有什么区别吗 参数解包存在差异 许多人使用kwargs 并通过dict作为论据之一 使用参数解包 Prepare f
  • 如何将列表字典写入字符串而不是 CSV 文件?

    This 堆栈溢出问题 https stackoverflow com questions 37997085 how to write a dictionary of lists to a csv file将列表字典写入 CSV 文件的答案

随机推荐

  • Liquibase 3.0.1 Gradle 集成

    我们使用 liquibase 进行数据库版本控制 使用 gradle 作为构建工具 目前我们使用 liquibase in version 2 0 5 com augusttechgroup gradle liquibase plugin
  • 使用 google apps 脚本保护范围

    我有很多纸张需要保护 但某些范围除外 是否可以使用脚本来执行此操作 因为我有几张工作表 并且工作表中的许多范围需要不受保护 以便工作人员可以编辑工作表 我需要不受保护的范围是重复的 所以我希望它是可行的 我将在我给您的示例表上用黄色填充我想
  • site_url() 和 base_url() 有什么区别?

    正如我读过的一些资源 base url and site url 功能于Codeigniter几乎相同 尽管我的 Codeigniter 版本 2 1 3 在其 config php 文件 在 config 目录中 中没有 site url
  • 如何在 Mac 上的 webkit 中消除 html5 搜索输入中的水平填充或缩进?

    仅在 Mac 上的 webkit 中 搜索输入中的文本从左侧缩进 这是一个演示 http oscorp net experiments search input 即使在剥离所有填充 文本缩进和设置之后 webkit appearance t
  • gem5 系统调用模拟 OpenBLAS cblas_dgemm 失败并显示“致命:系统调用 mbind (#237) 未实现”

    我正在开发一个程序 我需要在 SE 模式下模拟使用 gem5 调用 OpenBLAS 函数的程序 我的代码 C语言 如下 include
  • 循环记录的列

    我需要循环类型RECORD按键 索引排列的项目 就像我可以使用其他编程语言中的数组结构来做到这一点 例如 DECLARE data1 record data2 text BEGIN FOR data1 IN SELECT FROM some
  • 数据库设计-具有属性的多类别产品

    我正在为供应商设计一个基本的库存系统 他们有许多不同的产品类别 每个产品类别都有许多不同的属性 A x1 x2 x3 a1 a2 a3 B x1 x2 x3 b1 b2 b3 b4 C x1 x2 x3 c1 c2 Laptop Make
  • WPF - AvalonDock - 关闭文档

    我在 WPF 项目中将 AvalonDock 与 MVVM 结合使用 当我点击 X 选项卡的关闭按钮 时 我的文档将关闭但保留在内存中 看来只是隐藏而已 它没有从我的中删除Model Documents收藏 如果我添加DockingMana
  • Objective-C 对象可以成为它自己的委托吗?这是良好的编程习惯吗?

    我知道这是可能的 但这真的是一个好的编程实践吗 这个想法是子类化UIAlertView并订阅我自己作为我自己的代表 以便能够添加按钮和块处理程序 这样 当我得到alertView clickedButtonAtIndex 我将传递的块称为
  • 实例化地形区域内的对象

    using System Collections using System Collections Generic using UnityEngine public class Teleport MonoBehaviour public V
  • 与 DateSeparator 和 LongTimeFormat 相关的未声明标识符错误[重复]

    这个问题在这里已经有答案了 我有一个在Delphi 5中创建的程序 该程序在Win7和Win8上仍然运行得近乎完美 但是 当尝试在 XE6 试用版 中运行此代码时 我遇到两个错误 但无法修复 DateSeparator 和 LongTime
  • 在带有 Spring Boot 的 JSP 中使用自定义标记文件

    我有一个 Spring Boot 项目 我尝试在 JSP 文件中进行以下调用
  • FFmpeg 输出文件格式,无扩展名

    我正在开发一个系统 需要以以下形式存储视频 path to video
  • Python 一维 numpy 数组的中值过滤器

    我有一个numpy array有一个维度dim array 我期待获得像这样的中值滤波器scipy signal medfilt data window len 这实际上不适用于numpy array可能是因为维度是 dim array 1
  • python中的数字输入识别

    我正在尝试制作一个脚本来询问数学方程 然后用户必须输入他们认为的答案 然后 python 会输出答案 然而 由于某种原因 python 不喜欢 raw input 与 eval 语句相关 例如 代码是 print What s 5 4 a
  • 如何在 Spark 2.4 中加载自定义变压器

    我正在尝试在 Spark 2 4 0 中创建自定义变压器 保存起来效果很好 但是 当我尝试加载它时 出现以下错误 java lang NoSuchMethodException TestTransformer
  • 如何让 Unity 3D 中的对象保留在场景中并且不会重新创建

    我正在尝试找到一种在 Unity 3D 中播放背景音乐的好方法 我希望音乐在场景加载中保持一致播放 加载时不要破坏很好并且有效 但是每次我加载同一个场景时 它都会生成另一个音乐游戏对象 因为场景本身包含游戏对象 我该如何解决我的问题 我是一
  • aspx 和 aspx.cs 文件之间的链接断开

    在重命名不同的 ASPX 页面后 我曾多次遇到同样的问题 令我惊讶的是我在 stackoverflow 上找不到其他人也有同样的问题 当我运行 ASP NET C 项目时 调试器会向我显示一条类似这样的消息 Error 5 The name
  • 使用scrapy提取XHR请求?

    我正在尝试抓取使用 javascript 生成的社交点赞计数 如果我绝对引用 XHR url 我就能够抓取所需的数据 但是我尝试抓取的网站动态生成这些 XMLHttpRequest 其中包含我不知道如何提取的查询字符串参数 例如 您可以看到
  • 具有更新队列和输出队列的 Python 多处理

    如何编写使用两个队列的 Python 多进程脚本 一个作为工作队列 以一些数据开始 并且根据要并行化的函数的条件 动态接收更多任务 另一个收集结果并用于在处理完成后写下结果 我基本上需要根据我在初始项目中发现的内容在工作队列中放入更多任务