我尝试每天仅运行一次 DAG00:15:00
(午夜 15 分钟),然而,它被安排了两次,间隔几秒钟。
dag = DAG(
'my_dag',
default_args=default_args,
start_date=airflow.utils.dates.days_ago(1) - timedelta(minutes=10),
schedule_interval='15 0 * * * *',
concurrency=1,
max_active_runs=1,
retries=3,
catchup=False,
)
该 Dag 的主要目标是检查新电子邮件,然后检查 SFTP 目录中的新文件,然后运行“合并”任务以将这些新文件添加到数据库中。
所有作业都是 Kubernetes Pod:
email_check = KubernetesPodOperator(
namespace='default',
image="g.io/email-check:0d334adb",
name="email-check",
task_id="email-check",
get_logs=True,
dag=dag,
)
sftp_check = KubernetesPodOperator(
namespace='default',
image="g.io/sftp-check:0d334adb",
name="sftp-check",
task_id="sftp-check",
get_logs=True,
dag=dag,
)
my_runner = KubernetesPodOperator(
namespace='default',
image="g.io/my-runner:0d334adb",
name="my-runner",
task_id="my-runner",
get_logs=True,
dag=dag,
)
my_runner.set_upstream([sftp_check, email_check])
所以,问题是似乎有两次运行DAG
预定的相隔几秒钟。他们不run同时进行,但一旦第一个完成,第二个就开始。
这里的问题是my_runner
job 的目的是每天只运行一次:它尝试创建一个以日期为后缀的文件,如果该文件已经存在,它会抛出异常,因此第二次运行总是会抛出异常(因为当天的文件第一次运行时已经正确创建)
由于一张(或两张)图像相当于一千个单词,因此如下:
您将看到已安排第一次运行“00:15 后 22 秒“(这很好......有时它会在这里或那里变化几秒钟)然后还有第二个似乎总是被安排好的"5800:15 UTC 后的秒数”(至少根据他们得到的名字)。因此,第一个运行正常,似乎没有其他运行......并且一旦完成运行,就进行第二次运行(安排在00:15:
58
)启动(并失败)。
一个“好”的:
一个“坏”的: