您好,我想使用气流 hive 运算符执行 hive 查询并将结果输出到文件。我不想在这里使用 INSERT OVERWRITE 。
hive_ex = HiveOperator(
task_id='hive-ex',
hql='/sql/hive-ex.sql',
hiveconfs={
'DAY': '{{ ds }}',
'YESTERDAY': '{{ yesterday_ds }}',
'OUTPUT': '{{ file_path }}'+'csv',
},
dag=dag
)
做这个的最好方式是什么?
我知道如何使用 bash 运算符执行此操作,但想知道我们是否可以使用 hive 运算符
hive_ex = BashOperator(
task_id='hive-ex',
bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }}
/file_{{ds}}.json',
dag=dag
)
由于它是一个非常自定义的用例,因此最好的方法是扩展 Hive 运算符(或创建您自己的 Hive2CSVOperator)。实施取决于您是否可以通过 CLI 或 HiveServer2 访问 hive。
Hive CLI
我会首先尝试配置 Hive CLI 连接并添加hive_cli_params
, 按照Hive CLI 挂钩代码 https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L9,如果这不起作用,请扩展 Hook(这将使您可以访问所有内容)。
Hive服务器2
对于这种情况有一个单独的钩子(link https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L753)。它更方便一些,因为它有一个get_results
方法 (source https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L834) or to_csv
方法 (source https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L852).
The execute
操作员代码中的内容可能类似于:
def execute():
...
self.hook = HiveServer2Hook(...)
self.conn = self.hook.get_conn()
self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)