Luigi:如何将不同的参数传递给叶任务?

2024-03-02

这是我第二次尝试了解如何在 Luigi 中将参数传递给依赖项。第一个是here https://stackoverflow.com/questions/64837259/luigi-how-to-pass-arguments-to-dependencies-using-luigi-build-interface.

这个想法是:我有TaskC这取决于TaskB,这取决于TaskA,这取决于Task0。我希望整个序列始终完全相同,除了我希望能够控制哪个文件Task0从中读取,我们称之为path。 Luigi 的理念通常是每个任务只应该了解它所依赖的任务及其参数。这样做的问题是TaskC, TaskB, and TaskAall 都必须接受变量path其唯一目的是将其传递给Task0.

所以,Luigi为此提供的解决方案称为配置类 https://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes

这是一些示例代码:

from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

class Task0(Task):
    path = Parameter(default=config.path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

class TaskA(Task):
    arg = IntParameter(default=0)
    def requires(self): return Task0(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskA{self.arg}.txt")

class TaskB(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskA(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskB{self.arg}.txt")

class TaskC(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskB(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskC{self.arg}.txt")

(忽略所有output and run东西。它们就在那里,所以示例运行成功。)

上面例子的重点是控制线print(f"READING FROM {self.path}")没有任务 A、B、C 依赖path.

事实上,通过配置类我可以控制Task0争论。如果Task0没有通过path参数,它采用默认值,即config().path.

我现在的问题是,在我看来,这仅在解释器首次加载代码时的“构建时”起作用,但在运行时不起作用(我不清楚细节)。

所以这些都不起作用:

A)

if __name__ == "__main__":
    for i in range(3):
        config.path = f"newpath_{i}"
        luigi.build([TaskC(arg=i)], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

我不知道为什么这不起作用。

B)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 5 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)
    - 1 config(path=newpath_2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

这实际上是有道理的。有两个config类,我只设法改变了path其中之一。

Help?

编辑:当然,有path引用全局变量是可行的,但它不是通常 Luigi 意义上的参数。

EDIT2:我尝试了下面答案的第 1)点:

config具有相同的定义

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

我修正了指出的错误,即Task0 is now:

class Task0(Task):
    path = Parameter(default=config().path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

最后我做到了:

if __name__ == "__main__":
    for i in range(3):
        config.path = Parameter(f"file_{i}")
        luigi.build([TaskC(arg=i)], log_level="WARNING")

这不行,Task0仍然得到path="defaultpath.txt".


因此,您要做的是使用参数创建任务,而不将这些参数传递给父类。这是完全可以理解的,有时我在尝试处理这个问题时感到很恼火。

首先,您正在使用config类错误。当使用 Config 类时,如中所述https://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes https://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes,您需要实例化该对象。所以,而不是:

class Task0(Task):
    path = Parameter(default=config.path)
    ...

你会使用:

class Task0(Task):
    path = Parameter(default=config().path)
    ...

虽然现在这可以确保您使用的是一个值而不是一个Parameter对象,它仍然没有解决你的问题。创建类时Task0, config().path将被评估,因此它不会分配参考config().path to path,而是调用时的值(始终是defaultpath.txt)。当以正确的方式使用该类时,luigi 将构造一个Task对象仅具有luigi.Parameter属性作为新实例上的属性名称,如下所示:https://github.com/spotify/luigi/blob/master/luigi/task.py#L436 https://github.com/spotify/luigi/blob/master/luigi/task.py#L436

所以,我看到了两条可能的前进道路。

1.) 第一个是像您一样在运行时设置配置路径,除了将其设置为Parameter像这样的对象:

config.path = luigi.Parameter(f"newpath_{i}")

然而,这需要做很多工作才能让你的任务使用config.path现在工作时,他们需要以不同的方式接受参数(创建类时无法评估默认值)。

2.)更简单的方法是简单地在配置文件中指定类的参数。如果你看https://github.com/spotify/luigi/blob/master/luigi/task.py#L825 https://github.com/spotify/luigi/blob/master/luigi/task.py#L825,你会看到ConfigLuigi 中的类,实际上只是一个Task类,所以你可以用它做任何你可以用类做的事情,反之亦然。因此,您可以在配置文件中添加以下内容:

[Task0]
path = newpath_1
...

3.) 但是,由于您似乎想要运行多个任务,每个任务都有不同的参数,所以我建议您像 Luigi 鼓励您那样,通过父级传递参数。然后你可以使用以下命令运行所有内容:

luigi.build([TaskC(arg=i) for i in range(3)])

4.) 最后,如果您确实需要摆脱传递依赖项,您可以创建一个ParamaterizedTaskParameter延伸luigi.ObjectParameter并使用任务实例的pickle作为对象。

在上述解决方案中,我强烈建议使用 2 或 3。1 很难进行编程,而 4 会创建一些非常丑陋的参数,并且更高级一些。

编辑:解决方案 1 和 2 更像是黑客,只是建议您将参数捆绑在DictParameter.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Luigi:如何将不同的参数传递给叶任务? 的相关文章

  • 阻止 TensorFlow 访问 GPU? [复制]

    这个问题在这里已经有答案了 有没有一种方法可以纯粹在CPU上运行TensorFlow 我机器上的所有内存都被运行 TensorFlow 的单独进程占用 我尝试将 per process memory fraction 设置为 0 但未成功
  • 有没有一种方法可以将python对象直接存储在mongoDB中而不需要序列化它们

    我在某处读到过 您可以使用 BSON 将 python 对象 更具体地说是字典 作为二进制文件存储在 MongoDB 中 但是现在我找不到任何与此相关的文档 有人知道具体如何做到这一点吗 没有办法在不序列化的情况下将对象存储在文件 数据库
  • 如何使用 colorchecker 在 opencv 中进行颜色校准?

    我有数码相机获取的色彩检查器图像 我如何使用它来使用 opencv 校准图像 按照以下颜色检查器图像操作 您是想问如何进行颜色校准或如何使用 OpenCV 进行校准 为了进行颜色校准 您可以使用校准板的最后一行 灰色调 以下是您应该逐步进行
  • TensorFlow:带有轴选项的 bincount

    在 TensorFlow 中 我可以使用 tf bincount 获取数组中每个元素的计数 x tf placeholder tf int32 None freq tf bincount x tf Session run freq feed
  • 从 Python 下载/安装 Windows 更新

    我正在编写一个脚本来自动安装 Windows 更新 我可以将其部署在多台计算机上 这样我就不必担心手动更新它们 我想用 Python 编写这个 但找不到任何关于如何完成此操作的信息 我需要知道如何搜索更新 下载更新并从 python 脚本安
  • Pyqt-如何因另一个组合框数据而更改组合框数据?

    我有一个表 有 4 列 这 4 列中的两列是关于功能的 一个是特征 另一个是子特征 在每一列中 所有单元格都有组合框 我可以在这些单元格中打开txt 我想 当我选择电影院作为功能时 我只想看到子功能组合框中的电影名称 而不是我的 数据 中的
  • 小部件之间的自定义信号

    尝试将信号从一个 gtk EventBox 子级发送到另一个 在 init HeadMode 第 75 行 上出现错误 类型错误 未知信号名称 消息发送 why usr bin env python coding utf8 import p
  • 如何在Python中高效地添加稀疏矩阵

    我想知道如何在Python中有效地添加稀疏矩阵 我有一个程序 可以将大任务分解为子任务 并将它们分配到多个 CPU 上 每个子任务都会产生一个结果 一个 scipy 稀疏矩阵 格式为 lil matrix 稀疏矩阵尺寸为 100000x50
  • 根据 Pandas 中的列表对多列进行排序

    感谢有关如何根据 pandas 中的倍数列表对给定多列进行排序的任何提示 如下所示 import pandas as pd sort a a d e sort b s1 s3 s6 sort c t1 t2 t3 df pd DataFra
  • 使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值

    我想与 Python 共享这个特定的 Apache Spark 解决方案 因为它的文档非常贫乏 我想通过 KEY 计算 K V 对 存储在 Pairwise RDD 中 的平均值 示例数据如下所示 gt gt gt rdd1 take 10
  • 如何从 JSON 响应重定向?

    所以我尝试使用 Flask 和 Javascript 上传器 Dropzone 上传文件并在上传完成后重定向 文件上传正常 但在烧瓶中使用传统的重定向 return redirect http somesite com 不执行任何操作 页面
  • Python Django-如何从输入文件标签读取文件?

    我不想将文件保存在我的服务器上 我只想在下一页中读取并打印该文件 现在我有这个 index html
  • Python、subprocess、call()、check_call 和 returncode 来查找命令是否存在

    我已经弄清楚如何使用 call 让我的 python 脚本运行命令 import subprocess mycommandline lumberjack sleep all night work all day subprocess cal
  • 使用 Conda 更新特定模块会删除大量软件包

    我最近开始使用 Anaconda Python 发行版 因为它提供了许多开箱即用的数据分析库 使用 conda 创建环境和安装软件包也轻而易举 但是当我想更新 Python 本身或任何其他模块时 我遇到了一些严重的问题 我事先被告知我的很多
  • 从 python 检测 macOS 中的暗模式

    我正在编写一个 PyQt 应用程序 我必须添加一个补丁 以便在启用暗模式的 Macos 上可以读取字体 app QApplication Fix for the font colours on macos when running dark
  • Flask 应用程序的测试覆盖率不起作用

    您好 想在终端的 Flask 应用程序中测试 删除路由 我可以看到测试已经过去 它说 test user delete test app LayoutTestCase ok 但是当我打开封面时 它仍然是红色的 这意味着没有覆盖它 请有人向我
  • 字符串列表,获取n个元素的公共子串,Python

    我的问题可能类似于this https stackoverflow com questions 37514193 count the number of occurrences of n length not given string in
  • OSX 上的 locale.getlocale() 问题

    我需要获取系统区域设置来执行许多操作 最终我想使用 gettext 翻译我的应用程序 我打算在 Linux 和 OSX 上分发它 但我在 OSX Snow Leopard 上遇到了问题 python Python 2 5 2 r252 60
  • 通过 Web 界面执行 python 单元测试

    是否可以通过 Web 界面执行单元测试 如果可以 如何执行 EDIT 现在我想要结果 对于测试 我希望它们是自动化的 可能每次我对代码进行更改时 抱歉我忘了说得更清楚 EDIT 这个答案此时已经过时了 Use Jenkins https j
  • tkinter:打开一个带有按钮提示的新窗口[关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 用户如何按下 tkinter GUI 中的按钮来打开新窗口 我只需要非常简单的解决方案 如果代码也能被解释那就太好了 这

随机推荐