Azkaban介绍
常见的开源调度框架:
- Linux Crontab:针对个人用户及小任务量
- Apache Oozie:功能强大,配置复杂的Hadoop任务调度框架
- Azkaban:开源的工作流管理器,轻量级调度框架
- AirFlow:基于Python开发的通用批处理调度框架
- Zenus:阿里开源的基于Hadoop的工作流调度系统
- EasyScheduler:国内开源的分布式工作流任务调度系统
开源调度框架对比:
Azkaban简介:
- Linkedin公司开源的分布式批量工作流任务调度器
- 通过简单的KV的方式,生成Job,并构建依赖关系
- 通过插件化的任务提交模块,支持可扩展的多任务提交
- 官方文档:https://azkaban.readthedocs.io/en/latest/
Azkaban优点:
- 可通过job配置文件,快速建立任务和任务之间的依赖关系
- 提供模块化和可插拔的插件机制,原生支持shell、 java、 hive等
- 基于Java开发,提供Ajax Api,易于二次开发
Azkaban适用场景:
- 通过Azkaban结合Datax实现定时的数据采集服务
- 通过Azkaban调度执行Shell、Java、 Hive、 Hadoop等 任务
- 开发可复用的程序,通过Azkaban编排成工作流,执行批处理任务
- 对Azkaban进行二次开发通过接口创建任务、调度任务、管理任务
- 将Azkaban作为数据平台的- -部分,提供任务调度的能力
- 基于Azkaban的异常处理、监控报警、审计日志完善数据平台功能
Azkaban架构与调度流程
Azkaban架构图如下:
- AzkabanServer:Azkaban的管理服务,提供WebUI,负责Project管理、权限管理、定时执行、跟踪进度、审计日志等等功能
- AzkabanExecutor:负责工作流的提交和执行,搜集执行日志,也就是具体干活的节点
- MySQL:存储工作流详情及节点和任务的状态信息等
其中AzkabanWebServer可以说是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。
同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。
并且Azkaban使用方便,Azkaban使用以.job
为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies
属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip
的方式通过Azkaban UI上传到Web服务器上。
Azkaban有三种部署模式:
- Solo mode:内置数据库,Server和Executor在同一个 进程中
- Two mode:基于Mysq|数据库,启动一个Server和一个Executor
- Multi mode:分布式模式,一个Server和多个Executor
Azkaban执行流程图:
- 用户通过界面或者API提交任务到Webserver,Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个合适的executor下发工作流;
- executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;
- 分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;
- 被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。
Azkaban核心交互流程:
- AzkabanServer主动调用Executor的API获取状态信息
- 根据计算规则选择执行的Executor Server(任务数量、内存和CPU等资源、最近分配的时间)
- 调度WorkFlow到Executor执行,Executor执行并监控任务
Azkaban安装部署
- Azkaban官网:https://azkaban.github.io
- 软件下载地址:https://github.com/azkaban/azkaban
- 官方插件地址:https://github.com/azkaban/azkaban-plugins
- 官方文档地址:https://azkaban.readthedocs.io/en/latest/getStarted.html
这里采用的是Two mode部署模式,因为Multi mode只不过是在该基础上部署了多个ExecutorServer,也就是说在Two mode基础上增加ExecutorServer节点就是Multi mode了。
编译Azkaban源码
首先,准备好Java和Maven:
[root@azkaban01 ~]# java -version
java version "1.8.0_261"
Java(TM) SE Runtime Environment (build 1.8.0_261-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)
[root@azkaban01 ~]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 1.8.0_261, vendor: Oracle Corporation, runtime: /usr/local/jdk/1.8/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@azkaban01 ~]#
安装一些工具:
[root@azkaban01 ~]# yum install -y git gcc-c++
然后从GitHub上拉取Azkaban的源码:
[root@hadoop01 ~]# cd /usr/local/src
[root@hadoop01 /usr/local/src]# git clone https://github.com/azkaban/azkaban.git
进入源码目录,在settings.gradle
文件的开头增加插件仓库配置:
[root@hadoop01 /usr/local/src]# cd azkaban
[root@azkaban01 /usr/local/src/azkaban]# vim settings.gradle
pluginManagement {
repositories {
maven {
url 'https://maven.aliyun.com/repository/gradle-plugin'
}
gradlePluginPortal()
}
}
...
然后修改build.gradle
文件中的仓库配置:
[root@azkaban01 /usr/local/src/azkaban]# vim build.gradle
buildscript {
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
maven { url 'https://maven.aliyun.com/repository/google' }
maven { url 'https://maven.aliyun.com/repository/jcenter' }
}
...
}
...
allprojects {
apply plugin: 'jacoco'
repositories {
mavenLocal()
maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
maven { url 'https://maven.aliyun.com/repository/google' }
maven { url 'https://maven.aliyun.com/repository/jcenter' }
}
}
gradle/wrapper/gradle-wrapper.properties
文件中会定义从远程下载gradle,如果下载不下来的话,可以通过别的方式下载,然后上传到相应的目录下,并在该文件指定从本地文件系统中加载gradle的安装包:
[root@azkaban01 /usr/local/src/azkaban]# vim gradle/wrapper/gradle-wrapper.properties
distributionUrl=file:///usr/local/src/gradle-4.6-all.zip
完成以上的修改后,就可以执行如下命令开始编译安装了:
[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test
打包编译的过程中,有可能会报如下错误:
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':azkaban-web-server:nodeSetup'.
> Could not resolve all files for configuration ':azkaban-web-server:detachedConfiguration1'.
> Could not download node-linux-x64.tar.gz (org.nodejs:node:8.10.0)
> Could not get resource 'https://nodejs.org/dist/v8.10.0/node-v8.10.0-linux-x64.tar.gz'.
> Read timed out
这是因为系统中没有安装NodeJS,而azkaban-web-server这个模块需要用到NodeJS来编译web代码。由于无法通过远程下载NodeJS的安装包就会报这个错。解决方式也简单,在系统中安装NodeJS即可。步骤如下:
[root@azkaban01 /usr/local/src/azkaban]# curl --silent --location https://rpm.nodesource.com/setup_14.x | bash -
[root@azkaban01 /usr/local/src/azkaban]# yum install -y nodejs
[root@azkaban01 /usr/local/src/azkaban]# npm -v
6.14.8
[root@azkaban01 /usr/local/src/azkaban]# node -v
v14.15.0
[root@azkaban01 /usr/local/src/azkaban]#
设置npm
使用淘宝镜像仓库:
[root@azkaban01 /usr/local/src/azkaban]# npm config set registry https://registry.npm.taobao.org
[root@azkaban01 /usr/local/src/azkaban]# npm config get registry
https://registry.npm.taobao.org/
[root@azkaban01 /usr/local/src/azkaban]#
打开azkaban-web-server模块下的build.gradle
文件,修改原本的仓库配置,并注释掉node相关的配置。如下所示:
[root@azkaban01 /usr/local/src/azkaban]# vim azkaban-web-server/build.gradle
buildscript {
repositories {
maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }
maven { url 'http://maven.aliyun.com/nexus/content/repositories/jcenter' }
maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
maven { url 'https://maven.aliyun.com/repository/google' }
maven { url 'https://maven.aliyun.com/repository/jcenter' }
mavenCentral()
}
...
}
...
//node {
// Version of node to use.
//version = '8.10.0'
// Version of npm to use.
//npmVersion = '5.6.0'
// Base URL for fetching node distributions (change if you have a mirror).
//distBaseUrl = 'https://nodejs.org/dist'
// If true, it will download node using above parameters.
// If false, it will try to use globally installed node.
//download = true
// Set the work directory for unpacking node
//workDir = file("${project.buildDir}/nodejs")
// Set the work directory where node_modules should be located
//nodeModulesDir = file("${project.projectDir}")
//}
然后重新执行打包编译命令:
[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test
最终打包编译成功:
此时在核心组件的build/distributions
目录下,可以看到打包好的安装包:
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-exec-server/build/distributions/
azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz azkaban-exec-server-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-web-server/build/distributions/
azkaban-web-server-0.1.0-SNAPSHOT.tar.gz azkaban-web-server-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-db/build/distributions/
azkaban-db-0.1.0-SNAPSHOT.tar.gz azkaban-db-0.1.0-SNAPSHOT.zip
[root@azkaban01 /usr/local/src/azkaban]#
安装部署Azkaban
解压安装包:
[root@azkaban01 /usr/local/src/azkaban]# mkdir /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-db/build/distributions/azkaban-db-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-exec-server/build/distributions/azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
[root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-web-server/build/distributions/azkaban-web-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban
为了查看方便,将解压后的目录重命名:
[root@azkaban01 /usr/local/src/azkaban]# cd /usr/local/azkaban/
[root@azkaban01 /usr/local/azkaban]# mv azkaban-db-0.1.0-SNAPSHOT/ azkaban-db
[root@azkaban01 /usr/local/azkaban]# mv azkaban-exec-server-0.1.0-SNAPSHOT/ azkaban-exec-server
[root@azkaban01 /usr/local/azkaban]# mv azkaban-web-server-0.1.0-SNAPSHOT/ azkaban-web-server
首先,到MySQL中创建azkaban
数据库,然后将azkaban-db目录下的create-all-sql-0.1.0-SNAPSHOT.sql
文件给导入到MySQL中:
create database azkaban;
use azkaban;
source /usr/local/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql
然后配置azkaban-exec-server:
[root@azkaban01 /usr/local/azkaban]# cd azkaban-exec-server/
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim conf/azkaban.properties
# webserver的连接地址
azkaban.webserver.url=http://localhost:8081
database.type=mysql
mysql.port=3306
mysql.host=192.168.1.11
# MySQL8.x需要加时区参数,5.x则不需要
mysql.database=azkaban?serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=123456a.
mysql.numconnections=100
由于azkaban-exec-server默认使用的是5.x版本的MySQL驱动,而我这部署的MySQL是8.x版本的,所以还得替换一下MySQL驱动包:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# rm -rf lib/mysql-connector-java-5.1.28.jar
启动azkaban-exec-server:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh
检查azkaban-exec-server进程是否正常运行:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# jps
2005 Jps
1982 AzkabanExecutorServer
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# netstat -lntp |grep 1982
tcp6 0 0 :::35195 :::* LISTEN 1982/java
tcp6 0 0 :::36304 :::* LISTEN 1982/java
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]#
通过API手动激活Executor Server:
$ curl http://localhost:35195/executor?action=activate
接着配置azkaban-webserver:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# vim conf/azkaban.properties
database.type=mysql
mysql.port=3306
mysql.host=192.168.1.11
# MySQL8.x需要加时区参数,5.x则不需要
mysql.database=azkaban?serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=123456a.
mysql.numconnections=100
替换MySQL驱动包:
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# rm -rf lib/mysql-connector-java-5.1.28.jar
启动azkaban-webserver:
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh
检查azkaban-webserver进程是否正常运行:
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# jps
2201 Jps
2172 AzkabanWebServer
1982 AzkabanExecutorServer
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# netstat -lntp |grep 2172
tcp6 0 0 :::46136 :::* LISTEN 2172/java
tcp6 0 0 :::8081 :::* LISTEN 2172/java
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]#
使用浏览器访问webserver的页面,会进入到登录页,默认的用户名和密码都是azkaban
:
- webserver的用户相关配置可以在
conf/azkaban-users.xml
文件中修改
登录成功进入到首页,如下:
提交Azkaban任务
关于Job的官方文档:
Azkaban工作流:
- Project:Azkaban的抽象概念,项目。一个Project包括多个Flow
- Flow:流程,一个Flow包含多个Job及Job的依赖关系
- Job:具体的任务,有command、java、hive、hadoopJava等 类型
Azkaban任务类型:
- Azkaban拥有独立的plugins仓库,需对其进行编译
- 不同的Job plugin是建立在command的基础之.上
- Command类型是万能的Azkaban任务类型,因为通过command调用shell脚本,就可以在shell脚本里实现任意操作
单个任务
我们来通过WebServer的可视化界面提交一个最简单的command任务,首先创建任务定义文件:
$ vim cmd_test.job
type=command
command=sh job1.sh
编写一个简单的shell脚本:
$ vim job1.sh
#!/bin/sh
echo "hello azkaban"
将这两个文件打成一个zip包:
到WebServer页面上创建一个Project:
上传压缩包:
上传成功后,点击“Execute Flow” -> “Schedule”,通过配置crontab
表达式定义调度的时间:
配置好表达式点击“Schedule”后,可以在“Scheduling”看到正在调度的任务:
点击“Flow”下的“cmd_test”,可以查看该任务的执行情况:
多个任务
以上演示了单个任务的定义、提交和调度,接下来演示下多个任务的定义、提交和调度,并且这多个任务之间还存在依赖关系,也就是任务之间的调度存在先后顺序。首先,创建任务文件:
$ vim job1.job
type=command
command=sh job1.sh
----------
$ vim job2.job
type=command
command=sh job2.sh
# 依赖job1,当job1调度执行完才会执行job2
dependencies=job1
----------
$ vim job3.job
type=command
command=sh job3.sh
dependencies=job1
----------
$ vim job4.job
type=command
command=sh job4.sh
dependencies=job2,job3
编写与任务对应的shell脚本:
$ vim job1.sh
#!/bin/sh
echo "job1 exec over"
----------
$ vim job2.sh
#!/bin/sh
echo "job2 exec over"
----------
$ vim job3.sh
#!/bin/sh
echo "job3 exec over"
----------
$ vim job4.sh
#!/bin/sh
echo "job4 exec over"
同样,将这些文件打成一个压缩包:
在WebServer上新建一个Project,并将压缩包上传:
此时展开job可以看到一个树状结构:
点击“Execute Flow”,可以看到任务之间的依赖图:
点击“Execute”执行任务,该方式是单次执行,不会调度执行。所有任务节点均执行成功,图中的节点都是绿色的:
在“Job List”可以看到任务列表,以及一些执行信息:
Azkaban用户代理
Azkaban代理用户:
- Azkaban可以代理其他linux用户执行命令
- 通过代理用户模式可以实现Hadoop的权限控制
编译用户代理模块:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# mkdir extlib
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# gcc /usr/local/src/azkaban/az-exec-util/src/main/c/execute-as-user.c -o extlib/execute-as-user
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# chmod 6050 extlib/execute-as-user
创建配置文件:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim plugins/jobtypes/commonprivate.properties
execute.as.user=true
azkaban.native.lib=/usr/local/azkaban/azkaban-exec-server/extlib/
azkaban.group.name=root
重启ExecuteServer:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/shutdown-exec.sh
Killing executor. [pid: 1982], attempt: 1
shutdown succeeded
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh
激活ExecutorServer:
$ curl http://localhost:46176/executor?action=activate
- Tips:ExecutorServer每次重启后端口都不一样
重启WebServer:
[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/shutdown-web.sh
Killing web-server. [pid: 2172], attempt: 1
shutdown succeeded
[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh
接下来提交任务测试一下,创建任务定义文件:
$ vim proxy.job
type=command
command=sh test.sh
编写对应的shell脚本:
$ vim test.sh
#!/bin/sh
echo "----------------"
whoami
echo "----------------"
将其打成压缩包:
创建“Project”,并上传压缩包:
然后点击“Execute Flow” -> “Execute”执行该任务,此时会发现执行失败了:
查看日志可以看到不允许代理‘azkaban’用户:
到操作系统上,新建一个用户:
$ useradd hadoop
- Tips:Azkaban默认是禁止代理root用户的
修改任务配置文件,指定代理用户,如下所示:
$ vim proxy.job
type=command
command=sh test.sh
user.to.proxy=hadoop
然后重新打包上传,重新执行该任务。这次任务执行成功,输出的日志如下:
以上的示例都是简单的执行一个shell脚本,如果想真正调度起一个MR任务其实也很简单,就只需要配置执行相应的命令就可以了。如下示例:
type=command
command=yarn jar /soft/home/hadoop-2.8.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.5.jar pi 16 1000
user.to.proxy=hadoop
关于Java操作Azkaban Api
除了可以在可视化的Azkaban WebServer界面上进行项目的创建、任务的上传/提交等操作外,Azkaban还支持通过HTTP API来完成这些操作。因为我们如果要开发自己的大数据平台,可能并不会使用Azkaban WebServer的可视化界面,而是希望在自己的大数据平台界面去与Azkaban进行交互,完成任务的调度管理。所以Azkaban提供了HTTP Api的支持,让我们可以轻松实现与自研平台的整合。
关于Azkaban Api的官方文档地址如下:
我这里准备了一个示例代码仓库,可以简单参考下: