我在一个 dag 中有 30 个单独的任务,它们之间没有依赖关系。这些任务运行相同的代码。唯一的区别是数据量,有些任务会在几秒钟内完成,有些任务需要 2 小时或更长时间。
问题是在追赶期间,在几秒钟内完成的任务会被需要几个小时才能完成才能进入下一个执行日期的任务所阻止。
我可以将它们分成单独的 dag,但这似乎很愚蠢,而且 30 个任务将来会增加到更多。
有没有办法在不同的执行时间在同一个 dag 中运行任务?就像任务一完成,就开始下一个执行日期,而不管其他任务的执行情况如何。
添加图片进行说明。基本上,我希望在第一排看到另外两个实心绿色方框,而第三排仍然落后。
Edit:
y2k-shubham 之后解释,我尝试去实现它。但它仍然不起作用。快速任务开始于2019-01-30 00
,一秒完成,并且不开始2019-01-30 01
因为缓慢的任务仍在运行。如果可以的话,跑步是理想的选择2019-01-30 01
, 2019-01-30 02
, 2019-01-30 03
...如果可能的话并行
添加代码示例
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return 1
def slow(**kwargs):
time.sleep(600)
return 1
fast_task = PythonOperator(
task_id='fast',
python_callable=fast,
provide_context=True,
priority_weight=10000,
pool='fast_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
slow_task = PythonOperator(
task_id='slow',
python_callable=slow,
provide_context=True,
priority_weight=500,
pool='slow_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
fast_task >> slow_task # not working