这是我第二次尝试了解如何在 Luigi 中将参数传递给依赖项。第一个是here https://stackoverflow.com/questions/64837259/luigi-how-to-pass-arguments-to-dependencies-using-luigi-build-interface.
这个想法是:我有TaskC
这取决于TaskB
,这取决于TaskA
,这取决于Task0
。我希望整个序列始终完全相同,除了我希望能够控制哪个文件Task0
从中读取,我们称之为path
。 Luigi 的理念通常是每个任务只应该了解它所依赖的任务及其参数。这样做的问题是TaskC
, TaskB
, and TaskA
all 都必须接受变量path
其唯一目的是将其传递给Task0
.
所以,Luigi为此提供的解决方案称为配置类 https://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes
这是一些示例代码:
from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter
class config(luigi.Config):
path = Parameter(default="defaultpath.txt")
class Task0(Task):
path = Parameter(default=config.path)
arg = IntParameter(default=0)
def run(self):
print(f"READING FROM {self.path}")
Path(self.output().path).touch()
def output(self): return LocalTarget(f"task0{self.arg}.txt")
class TaskA(Task):
arg = IntParameter(default=0)
def requires(self): return Task0(arg=self.arg)
def run(self): Path(self.output().path).touch()
def output(self): return LocalTarget(f"taskA{self.arg}.txt")
class TaskB(Task):
arg = IntParameter(default=0)
def requires(self): return TaskA(arg=self.arg)
def run(self): Path(self.output().path).touch()
def output(self): return LocalTarget(f"taskB{self.arg}.txt")
class TaskC(Task):
arg = IntParameter(default=0)
def requires(self): return TaskB(arg=self.arg)
def run(self): Path(self.output().path).touch()
def output(self): return LocalTarget(f"taskC{self.arg}.txt")
(忽略所有output
and run
东西。它们就在那里,所以示例运行成功。)
上面例子的重点是控制线print(f"READING FROM {self.path}")
没有任务 A、B、C 依赖path
.
事实上,通过配置类我可以控制Task0
争论。如果Task0
没有通过path
参数,它采用默认值,即config().path
.
我现在的问题是,在我看来,这仅在解释器首次加载代码时的“构建时”起作用,但在运行时不起作用(我不清楚细节)。
所以这些都不起作用:
A)
if __name__ == "__main__":
for i in range(3):
config.path = f"newpath_{i}"
luigi.build([TaskC(arg=i)], log_level="INFO")
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 ran successfully:
- 1 Task0(path=defaultpath.txt, arg=2)
- 1 TaskA(arg=2)
- 1 TaskB(arg=2)
- 1 TaskC(arg=2)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
我不知道为什么这不起作用。
B)
if __name__ == "__main__":
for i in range(3):
luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 5 ran successfully:
- 1 Task0(path=defaultpath.txt, arg=2)
- 1 TaskA(arg=2)
- 1 TaskB(arg=2)
- 1 TaskC(arg=2)
- 1 config(path=newpath_2)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
这实际上是有道理的。有两个config
类,我只设法改变了path
其中之一。
Help?
编辑:当然,有path
引用全局变量是可行的,但它不是通常 Luigi 意义上的参数。
EDIT2:我尝试了下面答案的第 1)点:
config
具有相同的定义
class config(luigi.Config):
path = Parameter(default="defaultpath.txt")
我修正了指出的错误,即Task0
is now:
class Task0(Task):
path = Parameter(default=config().path)
arg = IntParameter(default=0)
def run(self):
print(f"READING FROM {self.path}")
Path(self.output().path).touch()
def output(self): return LocalTarget(f"task0{self.arg}.txt")
最后我做到了:
if __name__ == "__main__":
for i in range(3):
config.path = Parameter(f"file_{i}")
luigi.build([TaskC(arg=i)], log_level="WARNING")
这不行,Task0
仍然得到path="defaultpath.txt"
.