如何在 Luigi 中启用动态需求?

2024-04-16

我在 Luigi 中构建了一个任务管道。由于该管道将在不同的上下文中使用,因此可能需要在管道的开头或结尾包含更多任务,甚至任务之间的依赖关系完全不同。

就在那时我想:“嘿,为什么要在我的配置文件中声明任务之间的依赖关系?”,所以我在 config.py 中添加了这样的内容:

PIPELINE_DEPENDENCIES = {
     "TaskA": [],
     "TaskB": ["TaskA"],
     "TaskC": ["TaskA"],
     "TaskD": ["TaskB", "TaskC"]
}

我对在整个任务中堆积参数感到恼火,所以在某个时候我只引入了一个参数,task_config,即每个Task拥有必要的所有信息或数据run()被储存了。所以我把PIPELINE_DEPENDENCIES就在那里。

最后,我会拥有每一个Task我定义了从两者继承luigi.Task和一个自定义 Mixin 类,它将实现动态requires(),看起来像这样:

class TaskRequirementsFromConfigMixin(object):
    task_config = luigi.DictParameter()

    def requires(self):
        required_tasks = self.task_config["PIPELINE_DEPENDENCIES"]
        requirements = [
            self._get_task_cls_from_str(required_task)(task_config=self.task_config)
            for required_task in required_tasks
        ]
        return requirements

    def _get_task_cls_from_str(self, cls_str):
        ...

不幸的是,这不起作用,因为运行管道给我带来了以下结果:

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 were left pending, among these:
    * 4 was not granted run permission by the scheduler:
        - 1 TaskA(...)
        - 1 TaskB(...)
        - 1 TaskC(...)
        - 1 TaskD(...)

Did not run any tasks
This progress looks :| because there were tasks that were not granted run permission by the scheduler

===== Luigi Execution Summary =====

和很多

DEBUG: Not all parameter values are hashable so instance isn't coming from the cache

虽然我不确定这是否相关。

所以: 1.我的错误是什么?可以修复吗? 2.还有其他方法可以实现这一目标吗?


我意识到这是一个老问题,但我最近学习了如何启用动态依赖关系。我能够通过使用 WrapperTask 并生成一个字典理解(如果您愿意的话,您也可以做一个列表)以及我想要传递给 require 方法中其他任务的参数来完成此任务。

像这样的事情:

class WrapperTaskToPopulateParameters(luigi.WrapperTask):
    date = luigi.DateMinuteParameter(interval=30, default=datetime.datetime.today())

    def requires(self):
    base_params = ['string', 'string', 'string', 'string', 'string', 'string']
    modded_params = {modded_param:'mod' + base for base in base_params}
    yield list(SomeTask(param1=key_in_dict_we_created, param2=value_in_dict_we_created) for key_in_dict_we_created,value_in_dict_we_created in modded_params.items())

如果有兴趣,我也可以发布一个使用列表理解的示例。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Luigi 中启用动态需求? 的相关文章

随机推荐