正确完成管道中的多处理

2024-03-27

我想知道多重处理是如何正确完成的。假设我有一个清单[1,2,3,4,5]由函数生成f1这是写到Queue(左绿色圆圈)。现在我启动两个从该队列中提取的进程(通过执行f2在过程中)。他们处理数据,例如:将值加倍,并将其写入第二个队列。现在,函数f3读取该数据并将其打印出来。

在函数内部有一种循环,试图永远从队列中读取。我该如何停止这个过程?

Idea 1

f1不仅发送列表,还发送None对象或自定义对象,class PipelineTerminator: pass或者一些这样的东西正在一路向下传播。f3现在等待None来了,当它在那里时,它就脱离了循环。问题:两者之一可能f2s 读取并传播None而另一个仍在处理数字。然后最后一个值就丢失了。

Idea 2

f3 is f1。所以函数f1生成数据和管道,生成进程f2并提供所有数据。在产卵和进食后,它会监听第二个管道,简单地计算和处理接收到的对象。因为它知道输入了多少数据,所以它可以终止正在执行的进程f2。但如果目标是建立处理管道,则不同的步骤应该是可分离的。所以f1, f2 and f3是管道的不同元素,并且昂贵的步骤是并行完成的。

Idea 3

管道的每个部分都是一个函数,该函数根据需要生成进程并负责管理它们。它知道有多少数据传入以及有多少数据已返回(使用yield或许)。所以传播是安全的None object.

setup child processes 

execute thread one and two and wait until both finished

thread 1:
    while True:
        pull from input queue
        if None: break and set finished_flag
        else: push to queue1 and increment counter1

thread 2:
    while True:
        pull from queue2
        increment counter2
        yield result
        if counter1 == counter2 and finished_flag: break

when both threads finished: kill process pool and return.

(除了使用线程之外,也许人们可以想到一种更聪明的解决方案。)

So ...

我已经按照想法 2 实现了一个解决方案,输入并等待结果到达,但它并不是真正的将独立功能插入在一起的管道。它可以完成我必须管理的任务,但很难维护。

我现在想听听您如何实现管道(在一个进程中使用生成器函数等很容易,但在多个进程中?)并通常管理它们。


With MPipe http://vmlaker.github.io/mpipe模块,只需执行以下操作:

from mpipe import OrderedStage, Pipeline

def f1(value):
    return value * 2

def f2(value):
    print(value)

s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))

for task in 1, 2, 3, 4, 5, None:
    p.put(task)

以上运行4道工序:

  • two对于第一阶段(函数f1)
  • one对于第二阶段(函数f2)
  • and one更多用于为管道提供数据的主程序。

The MPipe 食谱 http://vmlaker.github.io/mpipe/cookbook.html#operate-the-pipeline提供了一些关于如何使用内部关闭进程的解释None作为最后一个任务。

要运行代码,请安装 MPipe:

virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py

Output:

2
4
6
8
10
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

正确完成管道中的多处理 的相关文章

  • Python正则表达式替换除特定单词之外的所有内容

    我正在尝试执行以下操作用正则表达式 import re x re compile going you words to replace s I am going home now thank you string to modify pri
  • Python argparse 作为函数

    以这种方式获取命令行参数有什么本质上的错误吗 我的意思是把参数解析放入它自己的函数中 它会被认为是非 Pythonic 或更严重吗 usr bin python import argparse def getArgs argv None p
  • 配置 PIP 以在代理后面工作

    我已经安装了 python 3 4 3 附带pip 我想从代理后面使用 pip 所以我执行了以下操作 Created C Users foo pip pip ini并添加了代理配置部分 proxy export http proxy my
  • 如何在anaconda python 3.6上安装tensorflow

    我使用 anaconda 包安装了新版本的 python 3 6 但是我无法安装张量流 总是收到这样的错误 tensorflow gpu 1 0 0rc2 cp35 cp35m win amd64 whl 在此平台上不受支持 如何在 ana
  • 熊猫加入具有不同索引级别/日期时间的数据帧?

    嗨 我有两个 DataFrame 如下所示 dineType menuName unique columns date y m d
  • 来自多元 t 分布的样本 python

    我想知道Python中是否有一个从多元学生t分布中采样的函数 我有包含 14 个元素的均值向量 14x14 协方差矩阵和自由度 我想从这个 t 分布中采样一个向量 对于一维情况 我使用 stats t rvs df loc scale 并且
  • 使用 scipy、python、numpy 进行非线性 e^(-x) 回归

    下面的代码为我提供了一条最佳拟合线的平坦线 而不是沿着 e x 模型的一条适合数据的漂亮曲线 谁能告诉我如何修复下面的代码以使其适合我的数据 import numpy as np import matplotlib pyplot as pl
  • 使用 Python 打开新窗口时,selenium window_handles 不正确

    我想使用 selenium 和 Python 在一个浏览器中打开多个选项卡 并通过多个选项卡同时抓取实时投注赔率 网站主页生成游戏列表 但是 除非您找到游戏元素并使用 click 该网站是 ajax 密集型 否则无法获取游戏链接 这会在同一
  • 如何使用Peewee查询多个相似的数据库?

    我遇到了使用 Peewee 查询多个数据库的问题 我有 2 个现有的 mysql 数据库 让我们将它们命名为 A 和 B 结构相似 因为它是两个 Bugzilla 数据库 我使用 Pwiz 生成模型 modelsA py 和 modelsB
  • pandas groupby 并转换为 json 列表

    我有一个如下所示的 pandas 数据框 idx f1 f2 f3 1 a a b 2 b a c 3 a b c 87 e e e 我需要将其他列转换为基于索引列的字典列表 所以 最终结果应该是 idx features 1 f1 a f
  • Python3 - 如何将字符串转换为十六进制

    我正在尝试将字符串逐个字符转换为十六进制 但我无法在Python3中弄清楚它 在较旧的 python 版本中 我的以下内容有效 test This is a test for c in range 0 len test print 0x s
  • 错误:tensorflow:无法匹配检查点的文件

    我正在训练一个张量流模型 在每个时期之后我都会保存模型状态并腌制一些数组 到目前为止 我的模型执行了 2 个纪元 并且保存状态的文件夹包含以下文件 checkpoint model e knihy preprocessed txt e0 c
  • 如何检查discord.py中的所有者

    我试图让这个命令只有所有者才能运行它 是否有办法检查服务器的最高角色或创建者 我尝试了 commands is owner 但这仅检查某人是否是机器人的所有者 Guild owner https discordpy readthedocs
  • NumPy 根据另一个数组中的值对第三个数组中的每个匹配元素求和一个数组

    我有两个 numpy 数组 一个包含值 另一个包含每个值类别 values np array 1 2 3 4 5 6 7 8 9 10 valcats np array 101 301 201 201 102 302 302 202 102
  • 自动创建带有文件输出的目录[重复]

    这个问题在这里已经有答案了 假设我想制作一个文件 filename foo bar baz txt with open filename w as f f write FOOBAR 这给出了一个IOError since foo bar不存
  • 获取列的 [0, x] 元素的最小值

    我需要计算一列 其中值是对其他列进行矢量化运算的结果 df new col df col1 min 0 df col2 然而 事实证明我不能像上面的语法一样使用 min 那么 获得 pandas 列的零和给定值之间的最小值的正确方法是什么
  • 如何仅在按下某个键时触发鼠标单击?在Python中

    我想制作一个程序 或者当我单击某个键时 鼠标会自动单击 只要我单击该键 如果我不单击该键 它就会停止 我不希望只在触摸按键一次时才发生点击 而是只要按住按键就发生点击 也可以像雷蛇突触鼠标一样按下鼠标左键触发点击 任何想法 EDIT 1 这
  • 如何单独捕获这些异常?

    我正在编写一个与 Quickbooks 交互的 Python 程序 连接到 Quickbooks 时 根据问题的不同 我可能会遇到以下两个常见异常之一 pywintypes com error 2147352567 Exception oc
  • 如何从 PyObject 获取指向字符串的 char*

    我怎样才能得到一个char from a PyObject它指向一个字符串 例如 这是 python 脚本 Test Connect 272 22 20 65 1234 这是 C 代码 static PyObject Connect PyO
  • 内置模块位于哪里?

    我尝试查找列出的所有目录sys path但我找不到任何builtins py文件 那么它在哪里呢 从字面上看 该模块内置于 python 解释器中 gt gt gt import builtins gt gt gt builtins

随机推荐

  • 使用 Git 复制子模块

    我在 Git 中有一个项目 它有几个子模块 我需要下载这些子模块和可用的文件才能使用主项目 并且为了使子模块工作 我需要它们自己的子模块可用等等 为了进行设置 我使用递归地初始化子模块git submodule update init re
  • 具有三级深度连接的范围

    My Program桌子有很多Measures My Measure桌子有很多Targets My Target表有一列名为 money 我的 ActiveRecord 查询如下所示 programs2 Program includes m
  • 为什么当 TreeModel 添加新节点时我的 JTree 不更新?

    我正在使用一个DefaultTreeModel填充了覆盖DefaultMutableTreeNode它支持选择性地更改树中节点的显示字符串 如下面的代码所示 在我的表单中 我通过在单独的类中创建新节点来填充树 然后通过主数据类型的包装类将它
  • 计算不同的数,直到满足基于另一行的特定条件

    我有以下 df 原始df Step CampaignSource UserId 1 Banana Jeff 1 Banana John 2 Banana Jefferson 3 Website Nunes 4 Banana Jeff 5 A
  • 精确 HH:MM:SS 时间字符串的正则表达式模式

    我想准确验证字符串时间格式hh mm ss String 我的意思正是如此每个小时 分钟 秒必须是2 digits Also 仅接受逻辑值 like 小时 从 00 到 23 分钟 从 00 到 59 秒 从 00 到 59 当我检查时HH
  • 为什么我的 Oracle JVM 会为一个简单的“Hello World”程序创建所有这些对象?

    我正在玩jmap并发现简单的 Hello World Java 程序创建了数千个对象 这是对象的截断列表Oracle JVM 更新 131启动时创建 num instances bytes class name 1 402 4903520
  • geom_text 仅位于堆积条形图的顶部

    我想仅在堆叠条形图的顶部添加标签 这是我的数据框 create data frame building lt c Burj nKhalifa Zifeng nTower Bank of nAmerica Tower Burj Al Arab
  • 正则表达式,我可以排除字符对吗?

    如何从正则表达式中排除字符对 我正在尝试获取一个包含 5 个字母数字字符的正则表达式 后跟 除 XX 和 AD 之外的任何内容 后跟 XX So D22D0ACXX 会匹配 但是下面两个不会匹配 D22D0ADXX D22D0XXXX 我的
  • 将代理模型实例分配给外键

    我有一个 django auth 用户代理模型 它附加了一些额外的权限 如下所示 class User User class Meta proxy True permissions write messages May add new me
  • 中间人控制台:如何使用?

    我想知道如何使用middleman console 是一个简单的irb吗 我能用它做什么与简单的 irb 不同的事情 middleman console options Start an interactive console in the
  • 如何在量角器中设置单选按钮值

    我是量角器新手 我正在尝试使用量角器设置单选按钮值 我在互联网和 SO 上搜索了徒劳的答案 html
  • 使用 Hilt 提供首选项数据存储

    我试图提供一个共同的DataStore
  • 使用 Amazon S3 的 Active Storage 不使用指定的文件名进行保存,而是使用文件密钥进行保存

    我在使用 Active Storage 时遇到问题 当我上传到 Amazon S3 时 不是使用原始名称将文件保存在存储桶内 例如myfile zip它正在将其另存为key与该文件关联 所以在 Cyber duck 中我看到这样的东西 5Y
  • 使用 PHP 为 Apple 钱包通行证创建 PKCS #7 分离签名

    这对我来说是一个全新的概念 所以我在黑暗中拍摄 要创建签名文件 请创建 PKCS 7 分离签名 清单文件 使用与您的签名关联的私钥 证书 将 WWDR 中间证书包含在 签名 您可以从 Apple 网站下载此证书 将签名写入pass顶层的文件
  • 确保为一组定义一个且仅有一个默认值

    我有一个与地址表具有一对多关系的客户表 我想限制数据库以便客户with地址将始终有一个 且只有一个 默认地址 我可以很容易地添加一个约束 以确保每个客户只有一个默认地址 然而 我正在努力解决如何应用一个约束来确保地址始终被标记为默认地址 总
  • 距 X-Y-Z 日期还有多少天? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我正在尝试构建一个倒计时小部件 给定某个日期 PHP 中确定距该日期还有多少天的最简单方法是什么
  • 无法将可等待传递给 asyncio.run_coroutine_threadsafe

    我观察到asyncio run coroutine threadsafe函数不接受一般的可等待对象 我不明白这种限制的原因 观察 import asyncio async def native coro return asyncio cor
  • 如何生成 5 个总和为 100 的随机数 [重复]

    这个问题在这里已经有答案了 你知道一种将整数分成 5 组的方法吗 每组总数必须是随机的 但总数必须等于固定数字 例如我有 100 我想把这个数字分成 1 20 2 3 3 34 4 15 5 18 编辑 我忘了说 是的 平衡是一件好事 我想
  • Android 中的内存不足错误

    我创建了一款使用图像的游戏 并且我的游戏处于纵向和横向模式 我有两种不同的布局 当我从 1 方向切换到其他方向时 执行此操作 5 6 次后 会出现强制关闭错误 错误是这样的 原因 java lang OutOfMemoryError 位图大
  • 正确完成管道中的多处理

    我想知道多重处理是如何正确完成的 假设我有一个清单 1 2 3 4 5 由函数生成f1这是写到Queue 左绿色圆圈 现在我启动两个从该队列中提取的进程 通过执行f2在过程中 他们处理数据 例如 将值加倍 并将其写入第二个队列 现在 函数f