我在 Airflow 中有一个操作员:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
现在,我需要运行的实际查询有 24 行长。我想将其保存在一个文件中,并向操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理 SQL 所需的参数。
建议?
编辑:
这是我的代码:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
sql = '{{ templates_dict.sql }}',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
这给出:
jinja2.exceptions.UndefinedError:“templates_dict”未定义