考虑以下任务:
import luigi
class YieldFailTaskInBatches(luigi.Task):
def run(self):
for i in range(5):
yield [
FailTask(i, j)
for j in range(2)
]
class YieldAllFailTasksAtOnce(luigi.Task):
def run(self):
yield [
FailTask(i, j)
for j in range(2)
for i in range(5)
]
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
The FailTask
失败如果j > 0
. The YieldFailTaskInBatches
产生FailTask
在 for 循环内多次,而YieldAllFailTasksAtOnce
生成数组中的所有任务。
如果我跑YieldFailTaskInBatches
,Luigi 运行第一个循环中生成的任务,并且其中一个任务失败(i = 0, j = 1
),路易吉不会产生其余的。如果我跑YieldAllFailTasksAtOnce
,Luigi 按预期运行所有任务。
我的问题是:我怎样才能告诉 Luigi 继续运行剩余的任务YieldFailTasksInBatches
,即使某些任务失败了?有可能吗?
我问的原因是我有大约 400k 个任务要触发。我不想一次触发所有任务,因为这会让 Luigi 花费太多时间构建每个任务的要求(它们可以有 1 个任务)和 400 个要求)。我当前的解决方案是分批生成它们,一次很少,但如果其中任何一个失败,任务就会停止,其余的也不会生成。
看起来这个问题 https://github.com/spotify/luigi/issues/2516如果实施可以解决这个问题,但我想知道是否还有其他方法。
这是非常黑客的,但它应该做你想要的:
class YieldAll(luigi.Task):
def run(self):
errors = list()
for i in range(5):
for j in range(2):
try:
FailTask(i, j).run()
except Exception as e:
errors.append(e)
if errors:
raise ValueError(f' all traceback: {errors}')
class FailTask(luigi.Task):
i = luigi.IntParameter()
j = luigi.IntParameter()
def run(self):
print("i: %d, j: %d" % (self.i, self.j))
if self.j > 0:
raise Exception("i: %d, j: %d" % (self.i, self.j))
所以基本上你是在 luigi 上下文之外运行任务。除非你输出一个目标,luigi永远不会知道任务是否已经运行。
luigi 唯一知道的任务是 YieldAll。如果任何 YieldAll 创建错误,代码将捕获该错误并将 YieldAll 任务设置为失败状态。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)