我需要
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
我不确定我应该采取什么方式。 xcom 是去这里的路吗?另外,MYSQLOperator 只执行查询,不获取记录。有没有我可以使用的内置传输运营商?我如何在这里使用 MYSQL 挂钩?
你可能想使用一个使用钩子来获取数据的PythonOperator,
应用转换并将(现已评分)行运回其他地方。
有人可以解释如何进行同样的事情吗?
Refer - http://markmail.org/message/x6nfeo6zhjfeakfe http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
这是正确的方法吗?
另外我们如何使用 xcoms 来存储以下 MySqlOperator 的记录?
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
在过去的 90 分钟里,我真的一直在为此苦苦挣扎,这里有一个适合新手的更具声明性的方法:
from airflow.hooks.mysql_hook import MySqlHook
def fetch_records():
request = "SELECT * FROM your_table"
mysql_hook = MySqlHook(mysql_conn_id = 'the_connection_name_sourced_from_the_ui', schema = 'specific_db')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
sources = cursor.fetchall()
print(sources)
...your DAG() as dag: code
task = PythonOperator(
task_id = 'fetch_records',
python_callable = fetch_records
)
这会将数据库查询的内容返回到日志。
我希望这对其他人有用。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)