我在 Luigi 中构建了一个任务管道。由于该管道将在不同的上下文中使用,因此可能需要在管道的开头或结尾包含更多任务,甚至任务之间的依赖关系完全不同。
就在那时我想:“嘿,为什么要在我的配置文件中声明任务之间的依赖关系?”,所以我在 config.py 中添加了这样的内容:
PIPELINE_DEPENDENCIES = {
"TaskA": [],
"TaskB": ["TaskA"],
"TaskC": ["TaskA"],
"TaskD": ["TaskB", "TaskC"]
}
我对在整个任务中堆积参数感到恼火,所以在某个时候我只引入了一个参数,task_config
,即每个Task
拥有必要的所有信息或数据run()
被储存了。所以我把PIPELINE_DEPENDENCIES
就在那里。
最后,我会拥有每一个Task
我定义了从两者继承luigi.Task
和一个自定义 Mixin 类,它将实现动态requires()
,看起来像这样:
class TaskRequirementsFromConfigMixin(object):
task_config = luigi.DictParameter()
def requires(self):
required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
requirements = [
self._get_task_cls_from_str(required_task)(task_config=self.task_config)
for required_task in required_tasks
]
return requirements
def _get_task_cls_from_str(self, cls_str):
...
不幸的是,这不起作用,因为运行管道给我带来了以下结果:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 were left pending, among these:
* 4 was not granted run permission by the scheduler:
- 1 TaskA(...)
- 1 TaskB(...)
- 1 TaskC(...)
- 1 TaskD(...)
Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler
===== Luigi Execution Summary =====
和很多
DEBUG: Not all parameter values are hashable so instance isn't coming from the cache
虽然我不确定这是否相关。
所以:
1.我的错误是什么?可以修复吗?
2.还有其他方法可以实现这一目标吗?