如何使用生成器形成多个管道?

2024-01-26

我正在使用 python,并且正在尝试找到一种将多个生成器优雅地链接在一起的方法。问题的一个例子是,例如,有一个根生成器,它提供某种数据,每个值都像级联一样传递给它的“子级”,而级联反过来可能会修改它们接收的对象。我可以走这条路:

for x in gen1:
    gen2(x)
    gen3(x)

但它很丑而且不优雅。我正在考虑一种更实用的做事方式。


您可以将生成器变成协程,这样它们就可以send()并互相接收值(使用(yield)表达式)。这将使每个人都有机会更改他们收到的值,和/或将它们传递到下一个生成器/协程(或完全忽略它们)。

请注意,在下面的示例代码中,我使用了一个名为coroutine“启动”生成器/协程函数。这使得它们在第一次执行之前就执行yield表达/陈述。这是 YouTube 视频中的一个稍加修改的版本,该视频是一个非常有启发性的演讲,标题为关于协程和并发的有趣课程 https://www.youtube.com/watch?v=Z_OAlIhXziw这是 Dave Beazley 在 PyCon 2009 上发表的。

正如您应该能够从生成的输出中看到的,数据值正在由通过单个配置的每个管道进行处理send()到头部协程,然后头部协程有效地将其“复用”到每个管道中。由于每个子协程也执行此操作,因此可以建立一个复杂的进程“树”。

import sys

def coroutine(func):
    """ Decorator to "prime" generators used as coroutines. """
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)  # Create coroutine generator function.
        next(cr)                   # Advance to just before its first yield.
        return cr
    return start

def pipe(name, value, divisor, coroutines):
    """ Utility function to send values to list of coroutines. """
    print('  {}: {} is divisible by {}'.format(name, value, divisor))
    for cr in coroutines:
        cr.send(value)

def this_func_name():
    """ Helper function that returns name of function calling it. """
    frame = sys._getframe(1)
    return frame.f_code.co_name


@coroutine
def gen1(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 2 == 0:  # Only pipe even values.
            pipe(this_func_name(), value, 2, coroutines)

@coroutine
def gen2(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 4 == 0:  # Only pipe values divisible by 4.
            pipe(this_func_name(), value, 4, coroutines)

@coroutine
def gen3(*coroutines):
    while True:
        value = (yield)     # Receive values sent here via "send()".
        if value % 6 == 0:  # Only pipe values divisible by 6.
            pipe(this_func_name(), value, 6, coroutines)

# Create and link together some coroutine pipelines.
g3 = gen3()
g2 = gen2()
g1 = gen1(g2, g3)

# Send values through both pipelines (g1 -> g2, and g1 -> g3) of coroutines.
for value in range(17):
    print('piping {}'.format(value))
    g1.send(value)

Output:

piping 0
  gen1: 0 is divisible by 2
  gen2: 0 is divisible by 4
  gen3: 0 is divisible by 6
piping 1
piping 2
  gen1: 2 is divisible by 2
piping 3
piping 4
  gen1: 4 is divisible by 2
  gen2: 4 is divisible by 4
piping 5
piping 6
  gen1: 6 is divisible by 2
  gen3: 6 is divisible by 6
piping 7
piping 8
  gen1: 8 is divisible by 2
  gen2: 8 is divisible by 4
piping 9
piping 10
  gen1: 10 is divisible by 2
piping 11
piping 12
  gen1: 12 is divisible by 2
  gen2: 12 is divisible by 4
  gen3: 12 is divisible by 6
piping 13
piping 14
  gen1: 14 is divisible by 2
piping 15
piping 16
  gen1: 16 is divisible by 2
  gen2: 16 is divisible by 4
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用生成器形成多个管道? 的相关文章

  • Django 中的 Rpy2 错误 - 未为“”类型的对象定义转换“py2rpy”

    我以前从未使用过 R 并且正在尝试使用 rpy2 从 python 调用 R 函数 它可以在独立的 python 终端上运行 但不能在 Django 中运行 但rpy2似乎无法将python字符串转换为r对象 我正在使用同事提供的自定义库
  • 在 pandas 中单独打印一列的原始值?

    我有一个数据框 df pd DataFrame name george age 23 name anna age 26 现在我想检索乔治的年龄 df df name george age 但这会输出一些额外的信息以及原始值 0 23 Nam
  • 如何让python优雅地失败?

    我只是想知道如何让 python 在所有可能的错误中以用户定义的方式失败 例如 我正在编写一个处理 大 项目列表的程序 并且某些项目可能不符合我定义的格式 如果 python 检测到错误 它目前只会输出一条丑陋的错误消息并停止整个过程 但是
  • 正则表达式,选择最接近的匹配

    假设以下单词序列 BLA text text text text text text BLA text text text text LOOK text text text BLA text text BLA 我想做的是将 BLA 中的文本
  • 使用多级解决方案计算二维网格中的最近邻

    我有一个问题 在 x y 大小的网格中 我提供了一个点 并且我需要找到最近的邻居 在实践中 我试图在 pygame 中找到距离光标最近的点 该点跨越颜色距离阈值 计算如下 sqrt rgb1 0 rgb2 0 2 rgb1 1 rgb2 1
  • Django 不会以奇怪的错误“AttributeError: 'module' object has no attribute 'getargspec'”启动

    我对 Django 的内部结构有点缺乏经验 所以我现在完全陷入困境 它昨天起作用了 但我不记得我改变过任何重要的东西 当我转身时DEBUG True任何恰好位于列表中第一个的模块上都有堆栈跟踪 Traceback most recent c
  • 按多个键分组并对字典列表的值进行汇总/平均值

    在Python中按多个键进行分组并对字典列表进行汇总 平均值的最Pythonic方法是什么 假设我有一个字典列表 如下所示 input dept 001 sku foo transId uniqueId1 qty 100 dept 001
  • 在 iPython/pandas 中绘制多条线会生成多个图

    我试图了解 matplotlib 的状态机模型 但在尝试在单个图上绘制多条线时遇到错误 据我了解 以下代码应该生成包含两行的单个图 import pandas as pd import pandas io data as web aapl
  • Pandas:将 pytz.FixedOffset 应用于系列

    我有一个带有timestamp列看起来像这样 0 2020 01 26 05 00 00 08 00 1 2020 01 26 06 00 00 08 00 Name timestamp dtype datetime64 ns pytz F
  • pandas 中连续数据的平行坐标图

    pandas 的 parallel coordinates 函数非常有用 import pandas import matplotlib pyplot as plt from pandas tools plotting import par
  • PyArmor - 打包为一个可执行文件

    当我执行此命令时 您好 使用 PyArmor pyarmor pack main py 它将它打包到一个名为的文件夹中dist里面包含我的 exe 以及许多 Python 扩展文件 据我所知 PyArmor 使用 PyInstaller 来
  • 根据第三个变量更改散点图中的标记样式

    我正在处理多列字典 我想绘制两列 然后根据第三列和第四列更改标记的颜色和样式 我很难改变 pylab 散点图中的标记样式 我的方法适用于颜色 不幸的是不适用于标记样式 x 1 2 3 4 5 6 y 1 3 4 5 6 7 m k l l
  • 用于多个窗口的 Tkinter 示例代码,为什么按钮无法正确加载?

    我正在编写一个程序 应该 按一下按钮即可打开一个窗口 按另一个按钮关闭新打开的窗口 我使用类 以便稍后可以将代码插入到更大的程序中 但是 我无法正确加载按钮 import tkinter as tk class Demo1 tk Frame
  • 线性同余生成器 - 如何选择种子和统计检验

    我需要做一个线性同余生成器 它将成功通过所选的统计测试 我的问题是 如何正确选择发电机的数字以及 我应该选择哪些统计检验 我想 均匀性的卡方频率测试 每代收集10 000个号码的方法 将 0 1 细分为10个相等的细分 柯尔莫哥洛夫 斯米尔
  • 更换壳牌管道[重复]

    这个问题在这里已经有答案了 在 subprocess 模块的 Python 2 7 文档中 我找到了以下片段 p1 Popen dmesg stdout PIPE p2 Popen grep hda stdin p1 stdout stdo
  • 在 matplotlib 中绘制多边形的并集[重复]

    这个问题在这里已经有答案了 我正在尝试绘制几个多边形的并集matplotlib 具有一定的 alpha 水平 我当前的代码在交叉点处颜色较深 有没有办法让交叉路口与其他地方的颜色相同 import matplotlib pyplot as
  • 在Python中连续解析文件

    我正在编写一个脚本 该脚本使用 HTTP 流量行解析文件 并取出域 目前仅将它们打印到屏幕上 我正在使用 httpry 将流量连续写入文件 这是我用来删除域名的脚本 usr bin python import re input open r
  • Python 导入非常慢 - Anaconda python 2.7

    我的 python import 语句变得非常慢 我使用 Anaconda 包在本地运行 python 2 7 导入模块后 我编写的代码运行得非常快 似乎只是导入需要很长时间 例如 我使用以下代码运行了一个 tester py 文件 imp
  • Streamlabs API 405 响应代码

    我正在尝试使用Streamlabs API https dev streamlabs com Streamlabs API 使用 Oauth2 来创建应用程序 因此 首先我将使用我的应用程序的用户发送到一个授权链接 其中包含我的应用程序的客
  • 如何更改matplotlib中双头注释的头大小?

    Below figure shows the plot of which arrow head is very small 我尝试了下面的代码 但它不起作用 它说 引发 AttributeError 未知属性 s k 属性错误 未知属性头宽

随机推荐

  • Conda 不会删除包

    我的命令是否做错了什么 我无法删除 Keras conda remove name myEnv keras Collecting package metadata repodata json done Solving environment
  • CORS:预检通过,主请求完成 w/200,但浏览器仍然存在 Origin 错误

    我正在向运行 Express 的节点服务器发送 CORS ajax 请求 在服务器日志和 js 控制台中 我可以看到预检 OPTIONS 请求成功 然后 主请求也在服务器上成功 并以 200 和我认为正确的标头进行响应 但是 在 Chrom
  • 实施 Sitecore Multisite Robots.txt 文件

    如何为同一 Sitecore 解决方案上托管的每个网站实现不同的 robots txt 文件 我想从 sitecore 项目中动态读取 robots txt 您需要执行以下步骤 1 创建并实现您的自定义通用 ashx 处理程序 2 在 we
  • 滚动到时开始动画

    我花了一整天的时间寻找一种简单的方法来让我的动画在滚动到页面上的特定位置后开始 My css animation width 50 float left position relative webkit animation name tes
  • LaunchD Plist 不工作

    编辑 看起来好像我在控制台中收到错误 com apple launchd com xxxx adbind 57 退出 代码 1 那有什么意思 还 如果我加载使用 launchctl 命令登录的 launchd plist 文件 它工作正常
  • 在 web.config 中存储值 - appSettings 或 configSection - 哪个更有效?

    我正在编写一个可以使用几个不同主题的页面 并且我将在 web config 中存储有关每个主题的一些信息 创建一个新的sectionGroup并将所有内容存储在一起 还是将所有内容都放在appSettings中 是否更有效 配置部分解决方案
  • 如何将现有的 Mercurial 存储库转换为使用子存储库并保持历史记录完整?

    我一直在阅读有关子存储库以及如何使用转换扩展和文件映射将现有文件夹从 Mercurial 存储库提取到子存储库的内容 我可以成功地做到这一点 如果我有以下文件夹结构 C Project Project root txt Project Su
  • 防止所有非系统 shell 扩展加载到 GetOpenFileName、CFileDialog、IFileOpenDialog 等中

    我正在寻找一种编程方式来使用资源管理器 shell 提示用户输入文件名 并且我只希望加载系统 shell 扩展 我寻找此功能的原因是我想消除第 3 方 shell 扩展作为崩溃和其他不确定行为的可能原因 理想情况下 在某个地方有一个我错过的
  • Visual Studio Online 网站以调试模式部署到 Azure

    我创建了一个 ASP NET MVC Web 应用程序项目 将其提交到 Visual Studio Online Git 存储库 并链接到 Windows Azure 网站以进行自动部署 如果所有单元测试都成功 我的项目包含默认的 Web
  • 如何在 Angular2 上使用 onBlur 事件?

    如何检测 Angular2 中的 onBlur 事件 我想用它
  • 在 Perl 中计算字符串中单词数的最快方法是什么?

    我有一些函数在各种文本上运行了超过一百万次 这意味着这些函数的微小改进总体上会带来巨大的收益 目前 我注意到所有涉及字数统计的功能的运行时间都比其他功能要长得多 所以我想尝试以不同的方式进行字数统计 基本上 我的函数所做的就是获取许多具有与
  • 阻止用户输入字母? C++

    你好 我是 C 新手 但我有一个小问题 那就是我必须阻止用户在数字部分输入字母 我做了一次尝试 虽然有效 但很狡猾 因为它将允许用户继续 然后告诉他们出了问题并重新启动应用程序 我如何验证它以显示错误消息 告诉他们这不是数字并让他们重新输入
  • 如何根据对象索引合并两个列表 - 保留属性?

    我想合并两个列表 保留每个对象的索引 mylist lt list 1 NULL 2 otherlist lt list NULL 3 NULL 4 5 6 Desired list 1 3 2 4 5 6 my try suppressW
  • System.IO.IOException:使用 Directory.EnumerateDirectories 时句柄无效

    我有一个窗口服务可以将东西导入到我的系统中 有时我会收到 System IO IOException 句柄无效 有谁知道为什么会出现这种异常 下面你可以看到触发异常的代码 foreach string directoryPath in Di
  • 使用 OSMnx 提取约束多边形

    我正在使用 OSMnx 包来解决以下任务 地图上有一个由纬度和经度定义的点 X 我们需要检测包含该点 X 并受到相邻道路约束的多边形 所以基本上点 X 位于多边形内部 相邻道路将是该多边形的边界 到目前为止 我只设法在地图上绘制图形的可视化
  • 从函数返回后更改 ggplot 对象的点大小

    假设我有一个返回 ggplot 对象的函数 getplot function x rnorm 16 y rnorm 16 dat data frame x y myplot ggplot dat aes x y geom point myp
  • 所有公共结构都会产生对隐式删除的默认构造函数的调用

    我明白什么call to implicitly deleted default constructor意思是但我不明白为什么我会在这里得到它 struct TransformData enum type t kDelay 0 kScale
  • 编写一个可以采用 Int 或 Double 值的 scala 函数

    我编写了一个函数来接受以下类型的值 1 数组 1 0 2 0 3 0 这是一个元组 其中 Int 是第一个值 下一个是双精度数组 我还希望它也接受整数数组 我写的函数如下 def getCountsAndAverages T Paramet
  • C# 使用 NumberLong 将 mongodb bson 转换为 json

    我有一个动态 mongoDB bson 文档 我尝试将其反序列化为 C Dictionary 对象 bson 文档包含 LongNumber 类型 我遇到了麻烦 var json entity BsonValue ToJson JsonCo
  • 如何使用生成器形成多个管道?

    我正在使用 python 并且正在尝试找到一种将多个生成器优雅地链接在一起的方法 问题的一个例子是 例如 有一个根生成器 它提供某种数据 每个值都像级联一样传递给它的 子级 而级联反过来可能会修改它们接收的对象 我可以走这条路 for x