我在用MySqlHook建立连接气流数据库,并且我正在执行一些查询,但我需要在某处查看查询的结果(比如说日志),我怎样才能看到?
这是示例代码
t1 = MySqlOperator(
task_id='basic_mysql',
mysql_conn_id='airflow_db',
sql="select * from xcom",
dag=dag)
MySQL 运算符当前(撰写本文时为airflow 1.10.1)不支持在 XCom 中返回任何内容,因此目前的修复方法是自己编写一个小运算符。您可以直接在 DAG 文件中执行此操作:
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
class ReturningMySqlOperator(MySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_records(
self.sql,
parameters=self.parameters)
t1 = ReturningMySqlOperator(
task_id='basic_mysql',
mysql_conn_id='airflow_db',
sql="select * from xcom",
dag=dag)
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='basic_mysql')
string_to_print = 'Value in xcom is: {}'.format(xcom)
# Get data in your logs
logging.info(string_to_print)
t2 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_records,
dag=dag)
t1 >> t2
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)