前言
随着数据分析系统的逐渐完善,现今一套成熟的数据分析系统都是大量 任务单元 组成,且各个任务单元之间存在 时间或者依赖关系,为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。Azkaban作为一套成熟,轻量且功能完善的任务调度系统,可以胜任的充当任务调度的工作。
一、Azkaban是什么?
上图是Azkaban官网首页,Azkaban是一个 Open-source Workflow Manager ,开源的工作流管理系统。Azkaban能够将由包含时间和依赖关系的各个工作单元组成的工作流,按照关系生成调度任务计划,然后按照计划顺序执行工作流的各个任务。另外,Azkaban还为工作流提供负载均衡,失败重试等等机制来确保任务的执行成功。
二、Azkaban与其他的工作流调度系统
1.简单的任务调度系统
- Linux的Crontab
- java web 的 springSchedule
简单的任务调度系统,一般只能确定本任务的执行频率。无法按照依赖关系进行 “接力” 的形式执行工作流,只能预估当前任务所依赖上一个任务执行完成的时间,从而确定当前任务的时间,当任务依赖关系复杂时无法满足要求。
2.复杂的任务调度
- Azkaban 轻量,功能够用,调度可靠,支持 Web页面操作
- Oozie 较Azkaban重量,功能齐全,但是配置繁重,在CDH环境中可以使用 Web页面操作较友好
- Airflow python语言开发,需要了解python
- DolphinScheduler 国产系统,支持Web,流行度高
复杂任务调度系统,可以很好的解决存在时间和依赖关系的任务工作流。Azkaban作为老牌任务调度系统,开源轻量且稳定。
三、Azkaban架构
Azkaban由三部分构成:
-
Relational Database(Mysql)
azkaban将大多数状态信息都存于MySQL中,Azkaban Web Server 和 Azkaban Executor Server也需要访问DB。
-
Azkaban Web Server
提供了Web UI,是用户操作Azkaban的入口,也是Azkaban任务调度的主要管理者,包括 project 的管理,认证,调度,对工作流执行过程的监控等。
-
Azkaban Executor Server
调度工作流和任务,纪录工作流活任务的日志,之所以将AzkabanWebServer和AzkabanExecutorServer分开,主要是因为在某个任务流失败后,可以更方便的将重新执行。而且也更有利于Azkaban系统的升级
四、Azkaban配置
-
azkaban.project
由于Azkaban解析flow格式的Flow-Api有新旧两个版本,分别是 .propertise 和 .yml。所以再编写核心的配置的时候,需要先指定配置文件类型,一般采用新的Flow-API方式解析flow文件(.yml),即 azkaban-flow-version: 2.0
-
basic.flow
配置任务工作流的核心配置,后面以 azkaban.project 配置采用新的Flow-API方式解析flow文件 为例
basic.flow 基础配置
nodes: 工作流数组
----------------------------- 基础参数 ----------------------------
1. name: job名称
Type: job类型,command(命令), javaprocess(java程序)
Config: job的配置
command: 任务启动时执行的命令
------------------------------------------------------------------
---------------- 自动重试(web页面可设置手动重试) -----------------
retries: 重试次数
retry.backoff: 重试的时间间隔
--------------------------------------------------------------
------------------------ javaprocess -------------------------
Xms: 最小堆内存
Xmx: 最大堆内存
classpath: 类路径
java.class: main()全类名,jar包与flow文件同包上传时,可使用相对路径
main.args: main方法的参数
--------------------------------------------------------------
basic.flow条件工作流
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job的父Job输出的运行时参数构成,也可以使用预定义宏。
- 条件为当前Job的父Job输出的运行时参数
1)基本原理:
-
(1)父Job将参数写入
JOB_OUTPUT_PROP_FILE 环境变量所指向的文件
-
(2)子Job使用
${jobName:param} 来获取父Job输出的参数并定义执行条件
2)条件运算符:
-
同 java运算符,==,!=,>,<,&&,||,! 等
# 需求
# JobA执行一个shell脚本。
# JobB在JobA完成后执行,但JobB不需要每天都执行,而只需要每个周一执行。
#(1)新建JobA.sh
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE # 将结果写入变量
#(2)新建JobB.sh
#!/bin/bash
echo "do JobB" # 执行 jobB
#(3)新建condition.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:wk} == 1 # 获取变量并判断条件
#(4)将JobA.sh、JobB.sh、condition.flow和azkaban.project打包成condition.zip
#(5)创建condition项目=》上传condition.zip文件=》执行作业=》观察结果
#(6)按照我们设定的条件,JobB会根据当日日期决定是否执行。
- 条件为预定义宏
Azkaban中预置了几个特殊的判断条件,称为预定义宏。预定义宏会根据所有父Job的完成情况进行判断,再决定是否执行。
可用的预定义宏如下:
-
(1)
all_success : 表示父Job全部成功才执行(默认)
-
(2)
all_done :表示父Job全部完成才执行
-
(3)
all_failed :表示父Job全部失败才执行
-
(4)
one_success :表示父Job至少一个成功才执行
-
(5)
one_failed :表示父Job至少一个失败才执行
# 需求
# JobA执行一个shell脚本
# JobB执行一个shell脚本
# JobC执行一个shell脚本,要求JobA、JobB中有一个成功即可执行
#(1)新建JobA.sh
#!/bin/bash
echo "do JobA" # 执行 jobA
#(2)新建JobC.sh
#!/bin/bash
echo "do JobC" # 执行 jobC
#(3)新建macro.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success # 预定义宏判断
#(4)JobA.sh、JobC.sh、macro.flow、azkaban.project文件,打包成macro.zip(没有JobB.sh)
#(5)创建macro项目 =>上传macro.zip文件 => 执行作业 => jobA成功, JobB失败,而JobC依然可以执行
五、邮件报警
Azkaban作为一个任务调度系统,对于任务执行情况的监控不只体现在 Web页面展现,还提供原生的邮件报警支持,只需要简单的配置,就可以将 指定的任务 执行成功或失败的消息,通过邮件发送给开发者。
- 在azkaban-web节点上,编辑 conf/azkaban.properties ,配置发件方信息
#这里设置邮件发送服务器,需要 申请邮箱,切开通stmp服务,以下只是例子
mail.sender=tang@163.com
mail.host=smtp.163.com # 邮箱的smtp服务器地址
mail.user=tang@163.com
mail.password=xxx # 邮箱的授权码
-
需要重启 azkaban-web
-
配置收件方信息,选择工程 => Excuoter Flow => Notifaction => Failure/Success Emails