使用airflow hive操作符并输出到文本文件

2023-12-29

您好,我想使用气流 hive 运算符执行 hive 查询并将结果输出到文件。我不想在这里使用 INSERT OVERWRITE 。

hive_ex = HiveOperator(
    task_id='hive-ex',
    hql='/sql/hive-ex.sql',
    hiveconfs={
        'DAY': '{{ ds }}',
        'YESTERDAY': '{{ yesterday_ds }}',
        'OUTPUT': '{{ file_path }}'+'csv',
    },
    dag=dag
)

做这个的最好方式是什么?

我知道如何使用 bash 运算符执行此操作,但想知道我们是否可以使用 hive 运算符

hive_ex = BashOperator(
    task_id='hive-ex',
    bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }} 
    /file_{{ds}}.json',
    dag=dag
)

由于它是一个非常自定义的用例,因此最好的方法是扩展 Hive 运算符(或创建您自己的 Hive2CSVOperator)。实施取决于您是否可以通过 CLI 或 HiveServer2 访问 hive。

Hive CLI

我会首先尝试配置 Hive CLI 连接并添加hive_cli_params, 按照Hive CLI 挂钩代码 https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L9,如果这不起作用,请扩展 Hook(这将使您可以访问所有内容)。

Hive服务器2

对于这种情况有一个单独的钩子(link https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L753)。它更方便一些,因为它有一个get_results方法 (source https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L834) or to_csv方法 (source https://github.com/apache/incubator-airflow/blob/5127ea34e110891c56e1ba9f70211091d13fa553/airflow/hooks/hive_hooks.py#L852).

The execute操作员代码中的内容可能类似于:

def execute():
  ...
  self.hook = HiveServer2Hook(...)
  self.conn = self.hook.get_conn()

  self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用airflow hive操作符并输出到文本文件 的相关文章

随机推荐

  • 如何在C中检查stdin是否为空

    我正在 重新 学习 C 编程 因此以下问题是为了理解 使用 scanf 我了解到 或者我自己发现 实际上不需要很长时间就能达到这一点 刷新 stdin 是一件好事 我进一步发现 在您的帮助下 fflush stdin 不是标准的操作 例如不
  • SQL 对于字符串来说太长

    我有以下SQL需要查询 这是一个有效的 SQL 不幸的是 对于 VBA 中的字符串来说太长了 有人知道运行此查询的解决方法吗 SQL SELECT A cust ky A incid id A OPEN TS A CLOSE TS A RE
  • 存在哪些算法可以最大限度地减少图中节点之间的事务数量?

    这个标题可能没有意义 假设如下 A欠B 5美元 C欠B 10美元 B 欠 D 15 美元 在这种基本情况下 存在三个事务 但可以减少为两个事务 A 给 D 5 美元 C 给 D 10 美元 给定一个更复杂的图 有哪些算法可以最小化交易总数
  • 并行编程卡在串行设备的 Concurrency::create_task 调用的第四个实例上

    我目前正在开发一个程序 其中Raspberry Pi 3将每隔100ms读取4个Arduino USB 设备通过串行通信发送的内容 UI 在 Concurrency create task 的第四次调用 serialDeviceIndex
  • Android:如何在应用程序启动时执行用户手册或应用程序说明

    我是 Android 开发新手 我需要在应用程序启动时显示像 Flip kart 应用程序这样的用户手册 请参考下图 您将会了解 我实际上想要实现什么 有没有任何标准方法可以实现这一目标 您可以手动完成所有工作 或者使用名为 Showcas
  • RStudio 中使用列表自动完成代码

    鉴于以下列表 我是否需要在更清晰的代码或自动完成之间进行选择 或者我可以两者兼得吗 我在 MacOS 10 10 5 上使用最新版本的 RStudio gt l lt list gt l a lt data frame ID stringi
  • 如何在可变参数包中找到“min”类型?

    min 类型是指比较的类型less比所有根据编译时函数 例如sizeof 我有一个实施草案 http ideone com eXLkrM 先介绍一下我面临的两个问题 include
  • VS Code 不断以带有 BOM 的 UTF-8 格式保存我的文件

    VS Code 不断以带有 BOM 的 UTF 8 格式保存我的文件 我的files encoding未设置设置 默认为utf8 在我的用户设置或工作区设置中 files autoGuessEncoding设置也未设置 默认为false 当
  • 有没有办法为 2 个具有不同包名称的应用程序提供 1 个 Firebase 数据库?

    我有 2 个应用程序 不同的包名称 适合 2 个不同的用户组 一个将数据放入数据库 另一个检索数据 因此 我只需要 1 个 firebase 数据库用于这两个应用程序 据我所知 Firebase 只允许 1 个数据库对应 1 个包名称 有没
  • NUnit 3.0 TestCase const 自定义对象参数

    我已经写好了课程SomeObject我想定义一个const要在我的中保留 重用的该对象的实例TestCases 我应该如何重写下面的代码来实现这种行为 TestFixture public class SomeObjectTests pri
  • Python:按任意列对文件进行排序,其中列包含时间值

    我有一个人的 txt 文件 每个人旁边都有两次 这是 txt 文件 Xantippe 09 00 11 00 Erica 10 00 12 06 Marcia 09 30 11 45 Elizabeth 10 15 12 10 Angela
  • 使用 AWS Java SDK v2 从 AWS EKS 获取身份验证令牌

    如何使用 AWS Java SDK v2 从 AWS EKS 获取 Kubernetes 身份验证令牌 然后可用于使用 Kubernetes SDK 向 Kubernetes 进行身份验证的身份验证令牌 换句话说 我想从 EKS 获取身份验
  • 如何更新 Dynamodb 中字符串集 (SS) 类型的项目?

    我创建了一个字符串集类型的属性 当我创建项目并分配 SS 类型的属性时 一切正常 但是当我尝试更新此属性时 数据类型更改为列表 L 我试试这个 qw new AWS DynamoDB DocumentClient var params Ta
  • jQuery UI Sortable/Draggable 导致窗口跳转

    在我的网页上 我有一个包含可排序 UL 的 div 每当我滚动到页面底部并拖动最后一个 LI 时 我的页面就会跳转 并且滚动条 在整个窗口上 会增长 这是一个几乎解决了我的问题的 JSFiddle www jsfiddle net u5af
  • 在Django中的特定时间执行任务

    我必须在用户指定的特定时间执行任务 这不会是固定时间 它将根据用户 到时候我就得执行我的任务了 为了实现这一点 我尝试使用 django cron 也尝试使用 django crontab 但在这两种情况下 我们都必须在中指定 cron 详
  • 源代码控制政策

    我正在寻找不同源代码控制策略的概述 我只了解到主线政策 并希望在加入团队之前更好地了解其他政策 有人可以提供一个概述的链接 甚至给我一些政策名称 以便我可以启动谷歌吗 没有空的提交消息
  • 如何将内存流加载到 LibVLC 中?

    我想使用 LibVLC 播放内存流中的媒体文件 如下所示 Ideally it would go like this LibVLC MediaFromStream new MemoryStream File ReadAllBytes Fil
  • Javascript 中的通配符字符串比较

    假设我有一个包含许多字符串的数组 称为 birdBlue birdRed 和其他一些动物 比如 pig1 pig2 现在我运行一个 for 循环来遍历数组并应该返回所有鸟类 这里什么样的比较才有意义 Animals bird 这是我的第一个
  • 什么时候应该真正使用访问者模式

    在文中我什么时候应该使用访客设计模式 https stackoverflow com questions 255214 when should i use the visitor design pattern第一个回答者指出 现在我们要向层
  • 使用airflow hive操作符并输出到文本文件

    您好 我想使用气流 hive 运算符执行 hive 查询并将结果输出到文件 我不想在这里使用 INSERT OVERWRITE hive ex HiveOperator task id hive ex hql sql hive ex sql