我最近开始玩Luigi
,我想了解如何使用它不断地将新数据附加到现有的目标文件中。
想象一下,我每分钟都会 ping 一个 api 来检索新数据。因为一个Task
仅在以下情况下运行Target
尚不存在,一个简单的方法是通过当前参数来参数化输出文件datetime
。这是一个简单的例子:
import luigi
import datetime
class data_download(luigi.Task):
date = luigi.DateParameter(default = datetime.datetime.now())
def requires(self):
return []
def output(self):
return luigi.LocalTarget("data_test_%s.json" % self.date.strftime("%Y-%m-%d_%H:%M"))
def run(self):
data = download_data()
with self.output().open('w') as out_file:
out_file.write(data + '\n')
if __name__ == '__main__':
luigi.run()
如果我安排这个任务每分钟运行一次,它就会执行,因为当前时间的目标文件还不存在。但它每分钟创建 60 个文件。我想做的是确保所有新数据最终都在同一个文件中最终。实现这一目标的可扩展方法是什么?有什么想法、建议欢迎留言!
你不能。作为doc https://luigi.readthedocs.io/en/stable/api/luigi.local_target.html#luigi.local_target.LocalTarget for LocalTarget
says:
参数: mode (str) – 模式 r 以只读模式打开 FileSystemTarget,而 w 将以写入模式打开 FileSystemTarget。子类可以实现附加选项。
IE。仅有的r
or w
模式是允许的。附加选项,例如a
需要延长LocalTarget
班级;尽管它打破了 Luigi 任务执行所需的幂等性。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)