是的,这是可能的,我让它像这样工作:
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyCustomSensor, self).__init__(*args, **kwargs)
def poke(self, context):
application_id = context['ti'].xcom_pull(key='application_id')
print("We found " + application_id)
return True
这是一个完整的 DAG 示例:
import os
import sys
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
dag = DAG('my_dag_name',
description='DAG ',
schedule_interval=None,
start_date=datetime(2021, 1, 7),
tags=["samples"],
catchup=False)
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyCustomSensor, self).__init__(*args, **kwargs)
def poke(self, context):
application_id = context['ti'].xcom_pull(key='application_id')
print("We found " + application_id)
return True
def launch_spark_job(**kwargs):
application_id = "application_1613995447156_11473"
kwargs['ti'].xcom_push(key='application_id', value=application_id)
launch_spark_job_op = PythonOperator(task_id='test_python',
python_callable=launch_spark_job,
provide_context=True,
dag=dag)
wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
dag=dag,
mode="reschedule")
launch_spark_job_op >> wait_spark_job_sens