如果我有多个气流 dags 以及一些重叠的 python 包依赖项,我如何保留每个项目 deps。脱钩?例如。如果我在同一台服务器上有项目 A 和 B,我会用类似的东西运行它们。
source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
基本上,想在相同的情况下运行 dags(例如,每个 dag 使用可能有重叠的包依赖的 python 脚本。我想单独开发(即,当想要更新时不必使用包更新所有代码)该软件包仅适用于一个项目))。请注意,我一直在使用BashOperator
运行 python 任务,例如...
do_stuff = BashOperator(
task_id='my_task',
bash_command='python /path/to/script.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
有办法让它发挥作用吗?气流是否还有其他最佳实践方法可以帮助人们解决(或避免)此类问题?
根据 apache-airflow 邮件列表的讨论,解决我使用各种 python 脚本执行任务的模块化方式的最简单答案是直接为每个脚本或模块调用 virtualenv python 解释器二进制文件,例如。
source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
会翻译成类似的东西
do_stuff_a = BashOperator(
task_id='my_task_a',
bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
do_stuff_b = BashOperator(
task_id='my_task_b',
bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
在气流中。
关于将参数传递给任务的问题,这取决于您想要传入的参数的性质。在我的例子中,某些参数取决于数据表在 dag 运行当天的样子(例如表中的最高时间戳记录等) .)。为了将这些参数添加到任务中,我有一个在此之前运行的“congif dag”。在配置 dag 中,有一个任务将“真实”dag 的参数生成为 python 字典并转换为 pickle 文件。然后“config” dag 有一个任务,它是TriggerDagRunOperator
激活“真实”dag,它具有从“config”dag 生成的 pickle 文件中读取的初始逻辑(在我的例子中,作为Dict
)我把它读进去bash_command
串状bash_command=f"python script.py {configs['arg1']}"
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)