您可以使用Python和PowerShell的库subprocess和sys
在 Airflow > Dags 文件夹中,创建 2 个文件:main.py 和 caller.py
所以,main.py调用caller.py,caller.py进入机器(Windows)运行文件或例程。
这是过程:
代码Main.py:
# Importing the libraries we are going to use in this example
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
# Defining some basic arguments
default_args = {
'owner': 'your_name_here',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'retries': 0,
}
# Naming the DAG and defining when it will run (you can also use arguments in Crontab if you want the DAG to run for example every day at 8 am)
with DAG(
'Main',
schedule_interval=timedelta(minutes=1),
catchup=False,
default_args=default_args
) as dag:
# Defining the tasks that the DAG will perform, in this case the execution of two Python programs, calling their execution by bash commands
t1 = BashOperator(
task_id='caller',
bash_command="""
cd /home/[Your_Users_Name]/airflow/dags/
python3 Caller.py
""")
# copy t1, paste, rename t1 to t2 and call file.py
# Defining the execution pattern
t1
# comment: t1 execute and call t2
# t1 >> t2
代码调用者.py
import subprocess, sys
p = subprocess.Popen(["powershell.exe"
,"cd C:\\Users\\[Your_Users_Name]\\Desktop; python file.py"] # file .py
#,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.html"] # file .html
#,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.bat"] # file .bat
#,"cd C:\\Users\\[Your_Users_Name]\\Desktop; .\file.exe"] # file .exe
, stdout=sys.stdout
)
p.communicate()
如何知道你的代码是否可以在气流中运行,如果运行,就可以了。