如何将 SQL 作为带有参数的文件传递给 Airflow Operator

2023-12-23

我在 Airflow 中有一个操作员:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

现在,我需要运行的实际查询有 24 行长。我想将其保存在一个文件中,并向操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理 SQL 所需的参数。

建议?

编辑: 这是我的代码:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

这给出:

jinja2.exceptions.UndefinedError:“templates_dict”未定义


正如您所注意到的,MySqlToGoogleCloudStorageOperator https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/mysql_to_gcs.py#L43指定一个template_ext带有 .sql 扩展名。

首先在你的Dag,指定放置 .sql 文件的路径

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])

在 yourfile.sql 中放入您的大型查询。注意params.ord_id

SELECT * FROM orders where orderid> {{ params.ord_id }}

现在在sql运算符的参数,传递文件的名称。

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

重要的是不要在该文件名后面添加空格。这是因为 Jinja 模板引擎将查找以结尾的字符串.sql如果是,它会将其视为文件而不是字符串。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 SQL 作为带有参数的文件传递给 Airflow Operator 的相关文章

随机推荐

  • 分步更改 iPad Pro 的字体大小

    我有一个应用程序 仅适用于横向模式下的 iPad 屏幕设计完全在 IB 中通过自动布局完成 现在我想实现以下行为 在 iPad Pro 12 英寸上时 所有标签的字体大小应为 48 对于所有较小的 iPad 尺寸 字体大小应为 32 我在
  • 无法将foreign_key_checks设置为0/关闭

    我有一个小数据库 我刚刚添加了一些表并设置了外键约束 现在我想上传一些数据并阅读了有关使用以下内容暂时关闭检查的信息 SET FOREIGN KEY CHECKS 0 我正在从 SQL 窗口运行此查询 我收到一条 成功 消息 但是当我检查设
  • Flex 3中如何防止组件被拖出舞台

    我认为这个问题有一个简单的解决方案 只是不够简单让我找到它 问题 如何限制 Flex 3 中的 TitleWindow 被拖离屏幕 舞台 有没有办法将TitleWindow限制在查看区域 示例 假设我有一个占据 100 屏幕的应用程序 接下
  • 在 Python 中加载与 Jinja2 嵌套的 YAML

    我有一个 YAML 文件 all yaml 看起来像 var1 val1 var2 val2 var3 var1 var2 txt 如果我像这样在 Python 中加载它 import yaml f open all yaml dataMa
  • 获取导致异常的异常描述和堆栈跟踪,全部作为字符串

    如何转换捕获的Exception 其描述和堆栈跟踪 到str外用 try method that can raise an exception params except Exception as e print complete exce
  • OL3:缩放到地图上的矢量图层

    我有一张带有 openlayers 3 和矢量图层的地图 我想将地图调整为该矢量图层的大小 但到目前为止 我所能得到的只是将地图集中在该矢量的最后一个点上 因为在创建地图时无法访问矢量图层的点 if trackMap null for va
  • GWT RPC - 每个应用程序多个 RPC 服务

    我目前正在使用一个具有大型 RPC 服务的 GWT 应用程序 它有 100 多个方法 所有方法都做不同的事情 如果我将其拆分为多个 RPC 服务 我会获得什么样的性能优势 障碍 我相信我必须为每一个创建一个新的 servlet 所以我的主要
  • iframe onmouseout 捕获

    目前 我的父页面中有一个 iframe 我想知道当用户单击或移动到 iframe 边框之外 即返回父页面 时 是否可以捕获 onmouseout 事件来捕获 您应该能够在 iframe 页面中执行 document body onmouse
  • 圆角桌 LESS

    经过一番挖掘后我发现this https stackoverflow com questions 628301 css3s border radius property and border collapsecollapse dont mi
  • 当我们不需要主外键关系就可以加入时,为什么还需要主外键关系?

    当我们不需要主外键关系就可以加入时 为什么还需要主外键关系 test1 id lname fname dob no primary and foreign key and not unique no constraints test2 id
  • JEP 145 发生了什么(由于编译代码重用,jvm 启动速度更快)?

    在2012年 捷普 145 http hg openjdk java net jep jeps rev a16daa94ba0f创建的目的是 缓存编译后的本机代码在java中用于更快的 jvm 启动 当时 它已被正式宣布 https twi
  • Tensorflow,在 RNN 中保存状态的最佳方式?

    我目前有以下代码 用于张量流中一系列链接在一 起的 RNN 我没有使用 MultiRNN 因为我稍后要对每一层的输出做一些事情 for r in range RNNS with tf variable scope recurent d r
  • 适用于 iOS15 和 iOS16 的 NavigationLink

    我正在开发一个以 iOS 15 作为最低目标的应用程序 这意味着它还需要支持较新版本的 iOS 对于屏幕导航 我使用 NavigationView NavigationLink destination isActive label 我正是需
  • 无法在 Visual Studio 2013 中同步 Git

    我正在与另一位开发人员合作 我们似乎陷入了 GIT 困境 我定期提交代码并定期推送到远程主机 我的同事 虽然是一个很棒的人和开发人员 还没有养成这样做的习惯 当我今天早上去 Pull the Head 修订时 这是我的习惯 我遇到了以下错误
  • Asp.net MVC 核心中 Ajax.BeginForms 的替代方案是什么

    我认为在 asp net MVC 核心中 我们不再可以选择使用 Ajax BeginForms 那么 Ajax BeginForm 的替代方案是什么 您可以使用内联data ajax 属性
  • ASP.NET LINQ 查询用于过滤和循环多个表

    我有两个单独的域模型类 App 和 AgeGroup App 类包含一些基本的整数和字符串属性 AgeGroup 类也是如此 我想要实现的是 AppOrder 及其属性的所有应用程序的 JSON 输出 嵌套在按其 GroupOrder 属性
  • JQuery - 查找任何级别的所有后代,但不查找这些后代的后代

    问题 我正在尝试使用 JQuery find 查找元素内的所有后代在任何级别具有给定属性的后代 但不是具有相同属性的后代的后代 帮助理解 JQuery 下面查询的预期目标是查找元素内的所有后代 some id 在任何级别 有some att
  • 如何在 PHP 中检查域名服务器?

    我需要检查域名服务器正在使用什么 但在 PHP 中找不到正确的解决方案 我努力了checkdnsrr and dns get record 对于某些正在运行的域 它们都不会显示 NS Whois也不是解决办法 我的目的是过滤已设置名称服务器
  • 当您使用 C++ 中的 asm 代码操作寄存器时,会发生什么情况?

    一些代码 int x 1 for int i 1 i lt 10 i x i asm mov eax x 如果这个程序使用eax为了增加价值i 当我操纵时会发生什么eax 编译器会保存之前的寄存器吗 asm在执行 asm 代码后调用并使用它
  • 如何将 SQL 作为带有参数的文件传递给 Airflow Operator

    我在 Airflow 中有一个操作员 import orders op MySqlToGoogleCloudStorageOperator task id import orders mysql conn id con1 google cl