气流中的execution_date:需要作为变量访问

2024-04-17

我真的是这个论坛的新手。但有一段时间,我一直在为我们公司玩气流。抱歉,如果这个问题听起来很愚蠢。

我正在使用一堆 BashOperators 编写一个管道。 基本上,对于每个任务,我想简单地使用“curl”调用 REST api

这就是我的管道的样子(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime
                                  
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['[email protected] /cdn-cgi/l/email-protection'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

如果你注意到我在做current_datetime= datetime_obj.now(tz=tz.tzlocal())相反,我想要的是'执行日期'

我该如何使用'执行日期'直接并将其分配给我的 python 文件中的变量?

我遇到了访问参数的普遍问题。 任何帮助将不胜感激。

Thanks


The BashOperator's bash_command argument is a template。您可以访问execution_date在任何模板中作为datetime object使用execution_date多变的。在模板中,您可以使用任何jinja2操纵它的方法。

使用以下内容作为您的BashOperator bash_command string:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

如果您只想要相当于执行日期的字符串,ds将返回日期戳 (YYYY-MM-DD),ds_nodash返回相同的内容,但不带破折号 (YYYYMMDD) 等。更多信息macros可以在Api Docs https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html.


您的最终运算符将如下所示:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

气流中的execution_date:需要作为变量访问 的相关文章

  • 如何从气流传感器中提取 xcom 值?

    主要问题 我正在尝试创建一个 BigQuery 表 如果不存在 方法 使用 BigQueryTableSensor 检查表是否存在 并根据返回值 使用 BigQueryCreateEmptyTableOperator 创建或不创建新表 Pr
  • 气流中任务的粒度

    对于一项任务 有许多辅助任务 从文件 数据库获取 保存属性 验证 审核 这些辅助方法并不耗时 一个样本 DAG 流 fetch data gt gt actual processing gt gt validation gt gt save
  • 气流日志文件不存在:

    Airflow 在几周内工作正常 但突然开始出现几天错误 Dags 会因此错误而随机失败 日志文件不存在 airflow path 1 log获取自 http 8793 airflow path 1 log 无法从工作人员获取日志文件 对
  • 创建 dag run 时将参数传递给 Airflow Experimental REST api

    看起来 Airflow 有一个实验性的 REST api 允许用户使用 https POST 请求创建 dag 运行 这太棒了 有没有办法通过 HTTP 将参数传递给 create dag 运行 从官方文档来看 发现here https a
  • Airflow BashOperator 日志不包含完整输出

    我遇到一个问题 BashOperator 没有记录 wget 的所有输出 它只会记录输出的前 1 5 行 我已经尝试过仅使用 wget 作为 bash 命令 tester BashOperator task id testing bash
  • 如何定义 Apache Airflow DAG 的超时?

    我使用的是 Airflow 1 10 2 但 Airflow 似乎忽略了我为 DAG 设置的超时 我正在使用以下命令为 DAG 设置超时期限dagrun timeout参数 例如 20 秒 我有一个需要 2 分钟才能运行的任务 但 Airf
  • 如何管理气流 dag 之间的 python 包?

    如果我有多个气流 dags 以及一些重叠的 python 包依赖项 我如何保留每个项目 deps 脱钩 例如 如果我在同一台服务器上有项目 A 和 B 我会用类似的东西运行它们 source path to virtualenv a act
  • Airflow DAG动态结构

    我正在寻找一个可以决定 dag 结构的解决方案当 dag 被触发时因为我不确定我必须运行的操作员数量 请参阅下面我计划创建的执行顺序 Task B 1 Task C 1 Task B 2 Task C 2 Task A Task B 3 g
  • 清除后气流强制重新运行上游任务,即使下游任务标记为成功

    我在 Airflow 中有任务 A gt B gt C 当我运行 DAG 并全部成功完成时 我希望能够单独清除 B 同时将 C 标记为成功 B 清除并进入 no status 状态 但当我尝试重新运行 B 时 什么也没有发生 我尝试过 ig
  • Airflow - 跳过未来的任务实例而不更改 dag 文件

    我有一个 DAG abc 计划在每天上午 7 点 美国中部标准时间 运行 并且该 DAG 中有任务 xyz 由于某种原因 我不想为明天的实例运行任务 xyz 之一 如何跳过该特定任务实例 我不想对代码进行任何更改 因为我无权访问 Prod
  • 我无法通过 BashOperator xcom_push 参数

    我是 Airflow 的 xcom 功能的新手 我用 PythonOperator 尝试了它 它工作得很好 即 我可以从上下文中推送和提取值 但是当我在 BashOperator 上尝试它时 它不起作用 但是 我可以通过在任务创建期间添加
  • 我可以通过编程方式确定 Airflow DAG 是计划的还是手动触发的?

    我想创建一个片段 根据 DAG 是计划的还是手动触发的来传递正确的日期 DAG 每月运行一次 DAG 根据上个月的数据生成报告 SQL 查询 如果我运行预定的 DAG 我可以使用以下 jinja 片段获取上个月的数据 execution d
  • Airflow:只有一项任务的简单 DAG 永远不会完成

    我制作了一个非常简单的 DAG 如下所示 from datetime import datetime from airflow import DAG from airflow operators bash operator import B
  • 还有一个“此 DAG 在网络服务器 DagBag 对象中不可用”

    这似乎是一个相当普遍的问题 我有一个 DAG 我不仅可以手动触发它airflow trigger dag 但它甚至按照其时间表执行 但拒绝显示在 UI 中 我已经多次重新启动网络服务器和调度程序 按 刷新 十亿次 然后运行它airflow
  • 在 Airflow 中编写和导入自定义插件

    这实际上是两个问题合二为一 My AIRFLOW HOME结构如下 airflow dags plugins init py hooks init py my hook py another hook py operators init p
  • 为每个文件运行气流 DAG

    所以我在airflow中有一个非常好的DAG 它基本上在二进制文件上运行几个分析步骤 作为airflow插件实现 DAG 由 ftp 传感器触发 该传感器仅检查 ftp 服务器上是否有新文件 然后启动整个工作流程 所以目前的工作流程是这样的
  • 如何使用 Cloud Composer 将大数据从 Postgres 导出到 S3?

    我一直在使用 Postgres to S3 运算符将数据从 Postgres 加载到 S3 但最近 我必须导出一个非常大的表 并且我的 Airflow Composer 失败 没有任何日志 这可能是因为我们正在使用 Python 临时文件模
  • 如何获取使用 Dataproc 工作流模板提交的 jobId

    我已在 Airflow 操作员的帮助下使用 Dataproc 工作流模板提交了 Hive 作业 DataprocWorkflowTemplateInstantiateInlineOperator https airflow readthed
  • 气流 - 未知的蓝色任务状态

    我刚刚收到一个蓝色任务 该任务没有出现在状态图例中 我很好奇这是一个错误还是未记录的状态 正如您所看到的 蓝色没有显示在右侧的潜在状态列表中 我刚刚完成了所有过去 未来和上游尝试的清理 仅供参考 这是一个已知的 TaskInstance 状
  • Airflow 默认连接数过多

    我打开气流并检查连接 发现其后面运行的连接太多 关于如何杀死那些我不使用的任何想法 或者我很想知道运行它的最小 conn id 建筑学 LocalExecutor 与其他经纪人不同 Postgres 作为元数据库 但它列出了 17 个连接

随机推荐