airflow 文档学习(二) - 概念

2023-05-16

1. 核心功能

1.1 DAGs

有向无环图

反映所涉及的task的依赖关系

注:搜索dag的时候,airflow只会关注同事包含"DAG"和"airflow"字样的py文件

1.2 scope

airflow将加载任何可以从DAG file中import的DAG对象,但是它们必须出现在globals()中,例如下面的文件,只有tag_1会被加载,tag_2只会出现在本地scope中

dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()

1.3 Default Arguments

如果一个字典default_args被传给一个DAGs,它将会将其运用到所有的它的operator中。这使得复用default_args变得非常的方便

1.4 Context Manager

dags可以被当做一个管理器,去自动的分配新的operators给dag

1.5 Operators

dags描述的是怎么去跑一个工作流,operators决定实际做什么。

一个operator描述了在一个工作流中的单个task。operators经常但不总是原子的,这意味着他们可以独立存在而不需要去和别的operator分享资源。DAG将确保operators以正确的顺序运行,在这些依赖之外,operator通常是独立运行的,甚至他们肯能运行在不同的机器上。

这是个非常微妙的关键点:事实上,如果两个operator需要去共享一些信息,就像文件名或者一些小的数据,你应该去考虑将他们合并到一个operator中,如果上述情况确实是无法避免的,airflow有operator的交叉通信(xcom)在文档中有描述。

并且airflow提供了非常多的通用operator:

BashOperator 
PythonOperator 
EmailOperator
SimpleHttpOperator 等等

1.6 DAG Assignment

operator不用立马被分配给一个dag,但是,一旦operator被分配给了一个dag,它将无法被转移或者是取消分配。dag的分配在operator被创建之后可以被非常明确的完成,通过延期分配或者从其他operator推断的方式

例如下面的方式:

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

1.7 Bitshift Composition

在以前,operator的关系描述是通过set_upstream()和set_downstream()方法,在1.8 之后可以通过<>代替依赖方法。

1.8 Tasks

当一个operator被实例化之后,它就被称为是一个task。实例化在调用抽象operator时定义了具体的值,同时,参数化之后的task会称为dag的一个节点。

1.9 Task Instances

一个task实例代表一个task的特定运行,其特征在于dag、任务、和时间点的组合。

它拥有运行状态:running、success、failed、skipped、up for retry等

1.10 Workflows

通过组合dags和operators,你会创建TaskInstances,你可以创建复杂的工作流。

2. Additional Functionality

2.1 Hooks

Hooks是连接一些平台和数据库的接口,类似于 Hive, S3, MySQL, Postgres, HDFS, Pig。hooks尽可能的实现了通用接口,并且充当operator。还需要使用airflow.models.Connection 模型来检索主机名和身份认证信息,hooks将身份认证信息和代码放在管道之外,集中在元数据库中。

2.2 pools

一些系统会因为太多的进程不堪重负,airflow的pool可以被用作限制任意的task的运行。task可以在创建时通过参数指定存在的pool名称。

例如:

aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

pool中可以使用priority_weight参数去定义他的在队列中的权重,并且决定哪个task先行执行。

当容量被撑满时,task将会放入计划执行,一旦有容量,可运行的task和他们的状态将会被在前端展示,当插槽空闲,队列中的task将会基于权重进行排序执行。

2.3 Connections

外部系统的connection信息被存储在airflow的元数据库中,airflow的管道可以很简单地引用被集中管理的conn_id,无需另外进行操作。

当许多的connections被定义在同一个conn_id下,在这种情况下,当hooks使用get_connection方法时,airflow将随机选择一个connection,当重试时允许一些基本的负载均衡和容错。

一些hooks有默认的conn_id,当operators使用这个hook时不需要一个明确的conn_id。例如:PostgresHook的默认conn_id是postgres_default

2.4 Queues

当我们使用CeleryExecutor时,被塞入celery队列的task是可以被规定的,队列是BaseOperator的属性,所以任何task可以被分配到任何的队列中,而默认的队列环境是在配置文件的celery下的default_queue中配置的。

workers可以监听一个或多个队列中的task,当一个worker启动的时候(使用airflow worker命令),一个以逗号分隔的对列名可以被指定(airflow worker -q spark),这个worker将只会选择那些被连接到指定对列的task。

2.5 XComs

XComs使得tasks可以交换信息,允许更加细微的控制形式和分享状态。XComs原则上定义成key、value和timestamp,但是也可以跟踪一些属性,例如创建XCom的task/DAG。

XComs可以被pushed或者pulled,当一个task发送一个xcom,这个xcom是普遍可获得的。task可以被发送通过使用方法xcom_push(),此外,当一个task返回一个值时(不管是operators的execute()方法还是PythonOperators的python_callable方法),一个包含着返回值的xcom会自动发送。

tasks调用xcom_pull()去接受xcoms,可选择的根据key、task_ids、dag_id进行过滤。默认的,xcom_pull()在获得值时会根据keys自动筛选执行方法。

如果xcom_pull被传了一个task_id,则对应task最近一次的xcom值会被返回,如果一组task_ids传过去,会返回一组对应的xcom值

也可以直接在模板中获取xcom,例如:SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

值得注意的是,xcom与variable非常相似,但它是专门用于任务之间的通信而不是全局设置

2.6 Variables

variables是一种传统的方式去存储和取回任意的内容或者是key-value形式的airflow的设置,它可以在前端界面、代码或者cli中进行增删改查的操作,当你定义管道代码,就可非常方便的使用,例如:

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

你可以使用variables在一个jinjia模板中:

echo {{ var.value.<variable_name> }}

2.7 Branching

有时候你需要你的工作流进行分支,或者是根据任意上游发生的条件走某条路。这其中一种实现方法就是使用BranchPythonOperator

BranchPythonOperator和PythonOperator十分相似,除了python会期望一个python_callable去返回一个task_id,返回的task_id跳过所有其他路径,python的返回函数的task_id必须直接引用BranchPythonOperator任务下游的任务。

注意,在BranchPythonOperator中使用depend_on_past = True下游的任务在逻辑上是不合理的,因为跳过状态总是导致依赖于过去成功的块任务。如果非要这样的话可以中间建立一个虚拟任务进行过度。

2.8 SubDAGs

2.9 SLAs

记录失败的过错的sla任务列表

2.10 Trigger Rules

虽然正常的工作流行为是在所有的直接上游任务成功之后触发的,但是airflow允许更为复杂的依赖项。

所有的operators有一个trigger_rule,用来定义生成的任务被触发的规则,trigger_rule的默认参数是all_success,以下为别的参数解释:

all_success: (default) 所有的父级任务成功
all_failed: 所有的父级任务失败,或者上游状态为失败
all_done: 所有的父级任务执行完成
one_failed: 至少一个失败,并且不会等待所有任务执行完成
one_success: 至少一个成功,并且不会等待所有任务执行完成
dummy: 依赖只是为了展示,随意触发

注意,这些可以与depends_on_past结合使用,当设置为true时,如果任务的先前计划未成功,则不会触发

2.11 Latest Run Only

标准工作流行为涉及为特定日期/时间范围内运行的一系列任务,但是,某些工作流执行的任务与运行时间无关,但是需要按计划运行,就像标准的cron作业一样,在这些情况下,暂停期间错过的回填运行作业会浪费cpu周期。

2.12 Zombies & Undeads

僵尸任务的特点是没有心跳(由工作定期发出)和数据库中的运行状态,当工作节点无法访问数据库的时候,airflow进程在外部被终止或者节点重启的时候,他们可能会发生。僵尸查杀由调度程序的进程定期执行。

undead进程的特点是存在进程和匹配的心跳,但是airflow不知道此任务在数据库中运行。这种不匹配通常在数据库状态发生改变的时候发生,最有可能是通过删除UI中的任务实例视图中的行,指示任务验证其作为心跳例程的一部分的状态,并在确定他们处于这种不死的状态时终止自身。

2.13 Cluster Policy

你本地airflow设置文件可以定义一个策略功能,该功能可以根据其他任务或DAG属性改变其任务属性。

2.14 Documentation & Notes

2.15 Jinja Templating

转载于:https://www.cnblogs.com/ZhangShY/p/10102210.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

airflow 文档学习(二) - 概念 的相关文章

随机推荐

  • 《算法导论》第8章 线性时间排序(计数排序、桶排序、基数排序,以及比较排序算法的下界)

    关于比较排序算法的下界 xff1a 显而易见 xff0c 对于一个含有n个数字的数组 xff0c 他们的排列方式有n n 1 n 2 n 3 61 n xff01 种 对于其中仅存的一种正确的排序方式 xff0c 我们需要做的事情是将剩下的
  • 生产者消费者操作系统实验报告用C语言来实现

    span class token macro property span class token directive hash span span class token directive keyword include span spa
  • Windows下seata报错为can not register RM,err:can not connect to services-server的解决办法,亲测有效。

    nacoc下的服务 xff1a 点开seata的TC服务看一下里面的地址 xff0c 左下角为你该服务的ip地址 上图我的ip地址是我目前在用的校园网地址也就是你的桌面右下角的网络地址 我之前报错的原因呢是因为他这里的默认的ip是我之前虚拟
  • JS提取字符串的手机号或固话

    JS提取字符串的手机号或固话 座机 let str 61 39 座机号1 0451 1234567 座机号2 010 12345678 座机号3 4008208201 电话 17365040083转1号线 39 0451 010是地区区号
  • Arrays类及该类中的方法[java]

    基本概念 概述 xff1a 用于操作数组的工具类 xff0c 里面定义了常见操作数组的静态方法 1 Arrays类中的方法都是静态方法 xff0c 都是通过类名调用 2 都是静态方法 xff0c 也就是说明了Arrays中的方法没有一个是重
  • 2021版kali linux中文化教程

    kali的中文化 xff0c 一开始配置需要下载中科大的源 xff0c 打开 终端输入vim etc apt sources list xff0c 将内容变成如下 deb http mirrors ustc edu cn kali kali
  • windows向ubuntu虚拟机传输文件

    更新源 xff0c 更新过了就不用了 sudo apt get update 安装依赖项 sudo apt get install open vm 粘贴文件需要粘贴两次 xff08 我是用拖的 xff09 xff0c 第一次ubuntu会创
  • 从零开始的Spring6学习(一)

    新手上路 xff0c 诸多不足与错误 xff0c 请优雅指正 xff0c 轻喷 Spring6是一个里程碑的Spring框架 首先要明白为什么要出现框架 xff1a 在程序的开发中 xff0c 一直追求高内聚 xff0c 低耦合 xff0c
  • KindEditor中使用val()获取content内容后图片不显示

    场景 使用KindEditor进行图片上传后 xff0c 在js部分通过 val 获取内容后 xff0c 所获取的图片的 lt imgsrc 61 34 34 其中img标签与src连接在了一起导致图片不能显示 错误的数据库存取内容 xff
  • 微信登录显示连接失败,请检查网络

    背景 xff1a 最近公司网络不知道怎么回事 xff0c 显示连接失败 xff0c 请检查网络 最初解决方案 重新插拔网线 xff0c 就可以登陆微信了 xff0c 但是退出重登需要继续插拔网线 同事分享的解决方案 xff1a 1 右键以太
  • Property or method “XXX“ is not defined on the instance but referenced during render.解决方法

    在做Vue项目时 xff0c 有时候会看到这个警告 这里是因为vue检测到了 count 这个属性有被使用 xff0c 但是未定义 xff0c 页面虽然能够显示但是有红色信息总归是不好看的 xff0c 解决方法如下 xff1a 我们只需要在
  • 如何用Idea 创建Spring项目

    如何用IntelliJ Idea创建一个简单的Spring项目呢 xff0c 刚入门的人可能不太懂 xff0c 那我就来简单分享一下吧 第一步 点击新建一个maven项目 xff0c 点击下一步 xff08 不用选择从原型创建 xff09
  • OpenCV初尝试13——图像特征

    13 图像特征 13 1 Harris角点检测 Harris角点检测的思想是通过图像的局部的小窗口观察图像 xff0c 角点的特征是窗口沿任意方向移动都会导致图像灰度的明显变化 Harris角点检测的数学原理较为繁琐 xff0c 直接上个链
  • 华为secoclient客户端安装

    下载安装包 xff0c 右键以管理员身份运行 2 安装过程中一律选择同意 是 xff0c 完成后打开软件 点击标注处 3 输入网关地址和端口号 xff0c 选择添加 4 添加完成后确定并点击连接 xff0c 输入用户名 密码 xff0c 记
  • maven配置连接MySQL数据库

    2019年7月9号 问题 xff1a maven项目中连接不上mysql数据库 问题 xff1a maven项目中连接不上mysql数据库 从昨晚调bug一直调到今天上午 xff0c 昨晚发现了是maven项目中mysql数据库连接的问题
  • python学习:最适合初学者的8本Python书籍

    Python是最友好的编程语言之一 xff0c 也因此成为初学者的首选 xff0c 为了帮助你更好更快的上手Python xff0c 并学会使用Python进行编程 xff0c 本文我们为初学者分享了最好的Python书籍 希望能够对你有所
  • 最适合Python入门到大牛必看的7本书籍,一定要收藏!

    Python零基础应该阅读哪些书籍 xff1f 我推荐这三本书 1 Python学习手册 xff08 第4版 xff09 以计算机科学家一样的思维方式来理解Python语言编程 xff0c 实用的学习指南 xff0c 适合没有Python编
  • 前端开发:深入使用proxy代理解决前端跨域问题

    在前端领域里面 xff0c 跨域指的是浏览器允许向服务器发送跨域请求 xff0c 进而克服Ajax只能同源使用的局限性限制 同源策略是一种约定 xff0c 而且是浏览器中最基本也是最核心的安全功能 xff0c 若缺少了该策略 xff0c 浏
  • 手工搭建Servlet实现

    现在作为一个Java程序员 xff0c 我们已经习惯了使用IDE和Web框架进行开发 xff0c IDE帮助我们做了编译 打包的工作 Spring框架则帮助我们实现了Servlet接口 xff0c 并把Servlet容器注册到了Web容器中
  • airflow 文档学习(二) - 概念

    1 核心功能 1 1 DAGs 有向无环图 反映所涉及的task的依赖关系 注 xff1a 搜索dag的时候 xff0c airflow只会关注同事包含 34 DAG 34 和 34 airflow 34 字样的py文件 1 2 scope