一、概述
Azkaban的执行代码主要分为四大模块:
(1)azkaban-webserver:主要提供web界面展示和接收http请求的模块
(2)azkaban-execserver:具体执行任务的模块
(3)azkaban-common:公共模块,提供访问数据库,告警等公共类
(4)azkaban-plugins:插件模块,主要包含hive、spark、mr、java等模块
下面分别讲解各个模块的代码结构
二、azkaban-webserver
入口类是AzkabanWebServer 的 main方法。以下是该类的主要代码逻辑
1、加载 conf 目录下的 azkaban.properties 和 azkaban.private.properties 两个文件,获取用户配置的系统参数
Props azkabanSettings = AzkabanServer.loadProps(args);
2、构建Server服务对象,Azkaban的web服务由Jetty的提供的
final Server server = new Server();
3、是否采用ssl,由配置参数jetty.use.ssl的值确定,如果为true,则访问的时候需要用https,否则默认为http服务。此处我们设置jetty.use.ssl=false
if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
//配置ssl服务
server.addConnector(secureConnector);
} else {
//获取http端口
port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
SocketConnector connector = new SocketConnector();
connector.setPort(port);
connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
server.addConnector(connector);
}
4、构建Context
Context root = new Context(server, "/", Context.SESSIONS);
5、配置静态资源路径
ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
root.addServlet(staticServlet, "/css/*");
root.addServlet(staticServlet, "/js/*");
root.addServlet(staticServlet, "/images/*");
root.addServlet(staticServlet, "/fonts/*");
root.addServlet(staticServlet, "/favicon.ico");
6、配置动态请求路径对应的 Servlet,每种请求类型都由对应的servlet处理
root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager");
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
7、启动服务
try {
server.start();
} catch (Exception e) {
logger.warn(e);
Utils.croak(e.getMessage(), 1);
}
8、最后定义了一个ShutdownHook,用于在进程挂掉的时候打印当时的进程资源利用情况
Runtime.getRuntime().addShutdownHook(new Thread() {
...
}
每个处理请求的Servlet都有对应的两个方法,分别用来处理GET和POST请求
protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
}
protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
}
每个Servlet根据不同的参数值,调用具体的方法进行处理。
我们来看一个具体的接口调用过程:http://hostname:8081/executor?ajax=executeFlow&project=testProject&flow=testFlow
根据AzkabanWebServer中定义的请求处理路径,/executor 请求将会分发到ExecutorServlet类中处理,进入该类,在类的一开始就定义了一些Manager类:
private ProjectManager projectManager;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;
这些 Manager 类就是具体处理数据的业务类。
由于以上的请求是GET请求,我们从handleGet方法开始跟踪
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
} else if (hasParam(req, "execid")) {
... ...
}
由于我们存在ajax参数,所以进入handleAJAXAction方法,然后我们的ajax参数值是executeFlow,所以进入以下方法
ajaxAttemptExecuteFlow(req, resp, ret, session.getUser());
该方法先检查project和flow参数值是否正确,然后调用
ajaxExecuteFlow(req, resp, ret, user);
进行任务的提交,azkaban-webserver的主要逻辑大体都是这样。
三、azkaban-common
azkaban是典型的JavaWeb系统,如果说azkaban-webserver是表现层的话,那azkaban-common就是业务层和持久层,业务层逻辑封装在各个XXXManager中,持久层就是JdbcExecutorLoader类。我们接着分析以上的请求。
executorManager.submitExecutableFlow调用了azkaban-common模块的ExecutorManager类下的submitExecutableFlow方法,该方法封装exflow变量的各种属性值后,调用以下代码进行入库
executorLoader.uploadExecutableFlow(exflow);
同时调用dispatch方法从后台发送请求到azkaban-execserver模块进行具体任务的执行
dispatch(reference, exflow, choosenExecutor);
我们先看入库功能,点击进入executorLoader.uploadExecutableFlow方法,就是在JdbcExecutorLoader类中,azkaban用了JDBC接口与mysql交互,只是做了简单的封装,对外提供一个QueryRunner类进行数据库操作,代码如下
private synchronized void uploadExecutableFlow(Connection connection,
ExecutableFlow flow, EncodingType encType)
throws ExecutorManagerException, IOException {
//需要执行的sql
final String INSERT_EXECUTABLE_FLOW =
"INSERT INTO execution_flows "
+ "(project_id, flow_id, version, status, submit_time, submit_user, update_time, origin_exec_id) "
+ "values (?,?,?,?,?,?,?,?)";
//获取执行对象,这里其实就是DataSource
QueryRunner runner = new QueryRunner();
long submitTime = System.currentTimeMillis();
long id;
try {
flow.setStatus(Status.PREPARING);
//执行sql
runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
submitTime, flow.getSubmitUser(), submitTime, flow.getOriginExecId());
connection.commit();
//获取生成的最新id,作为execid
id =
runner.query(connection, LastInsertID.LAST_INSERT_ID,
new LastInsertID());
if (id == -1L) {
throw new ExecutorManagerException(
"Execution id is not properly created.");
}
logger.info("Flow given " + flow.getFlowId() + " given id " + id);
flow.setExecutionId((int) id);
//更新exec_id字段为新生成的id值
updateExecutableFlow(connection, flow, encType);
} catch (SQLException e) {
throw new ExecutorManagerException("Error creating execution.", e);
}
}
至此,入库就完成了。我们再回过头来看 dispatch(reference, exflow, choosenExecutor)方法。一步步跟踪下去后发现,其最终是发起了一个http请求,代码如下
private String callExecutorForJsonString(String host, int port, String path,
List<Pair<String, String>> paramList) throws IOException {
if (paramList == null) {
paramList = new ArrayList<Pair<String, String>>();
}
ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
@SuppressWarnings("unchecked")
#封装http请求,请求路径指向了azkaban-execserver模块
URI uri =
ExecutorApiClient.buildUri(host, port, path, true,
paramList.toArray(new Pair[0]));
#发起get请求,并获得返回值
return apiclient.httpGet(uri, null);
}
这里发起的http请求最后由azkaban-execserver模块的ExecutorServlet模块接收。下面继续分析azkaban-execserver模块。
四、azkaban-execserver
该模块的入口类是AzkabanExecutorServer,其main方法也像web模块一样,封装了各个请求路径的处理servlet,如下
Context root = new Context(server, "/", Context.SESSIONS);
root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
所以我们在common模块看到的代码callExecutorForJsonObject(host, port, "/executor", paramList);其实就是请求了这里的ExecutorServlet类进行处理。
由于是GET请求,我们进入ExecutorServlet类的doGet方法;由于common中封装的参数是EXECUTE_ACTION,所以我们进入以下代码
else if (action.equals(EXECUTE_ACTION)) {
handleAjaxExecute(req, respMap, execid);
}
在handleAjaxExecute类中调用submitFlow方法
flowRunnerManager.submitFlow(execId, lastExecId);
在submitFlow方法中构造了FlowRunner对象
FlowRunner runner =
new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
并提交到ThreadPool中执行。在FlowRunner的run方法中,其实是构造了一个JobRunner对象
JobRunner jobRunner =
new JobRunner(node, path.getParentFile(), executorLoader,
jobtypeManager);
并提交到线程池中执行
JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
//提交执行
executorService.submit(runner);
activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
logger.error(e);
}
在JobRunner的run方法中才真正运行job任务。该方法调用prepareJob进行一些初始化工作,主要是封装job类
try {
job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
} catch (JobTypeManagerException e) {
logger.error("Failed to build job type", e);
return false;
}
在buildJobExecutor获取任务的type类型,如command、hive、java、HadoopJava等,其实就是azkaban的jobtypes下对应的几个文件夹名称
buildJobExecutor方法中主要根据配置的type对于的class路径,通过反射的方式获取对应的实例
public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
throws JobTypeManagerException {
//获取type对应的class路径
Job job = null;
try {
String jobType = jobProps.getString("type");
... ...
Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
if (executorClass == null) {
throw new JobExecutionException(String.format("Job type '" + jobType
+ "' is unrecognized. Could not construct job[%s] of type[%s].",
jobProps, jobType));
}
... ...
job =
(Job) Utils.callConstructor(executorClass, jobId, pluginLoadProps,
jobProps, logger);
} catch (Exception e) {
logger.error("Failed to build job executor for job " + jobId
+ e.getMessage());
throw new JobTypeManagerException("Failed to build job executor for job "
+ jobId, e);
}
return job;
}
以上的jobType就是azkaban的plugins/jobtypes下的每个子目录中的private.properties文件,如hadoopJava的private.properties文件内容是:
jobtype.class=azkaban.jobtype.HadoopJavaJob
该类在 azkaban-jobtype-x.x.x.jar包里,对应的项目是azkaban-plugins
五、azkaban-plugins
这里介绍azkaban-plugins模块的代码,举例的类型为java。我们来回顾Java类型的任务流编写流程
JavaMain.java
package com.dataeye.java;
import org.apache.log4j.Logger;
import azkaban.utils.Props;
public class JavaMain {
private static final Logger logger = Logger.getLogger(JavaMain.class);
private final int fileRows;
private final int fileLine;
public JavaMain(String name, Props props) throws Exception {
this.fileRows = props.getInt("file.rows");
this.fileLine = props.getInt("file.line");
}
public void run() throws Exception {
logger.info(" ### this is JavaMain method ###");
logger.info("fileRows value is ==> " + fileRows);
logger.info("fileLine value is ==> " + fileLine);
}
}
java.job
type=java
job.class=com.dataeye.java.JavaMain
classpath=lib/*
file.rows=10
file.line=50
根据上面的源码分析可知java类型的工作流任务初始化类在plugins/jobtypes/java目录下的private.properties文件,内容如下
jobtype.class=azkaban.jobtype.JavaJob
所以我们的入口类就是azkaban-plugins模块的JavaJob类,入口方法是run()方法。
在父类的run方法中调用了JavaProcessJob类的getCommandList()
try {
var27 = this.getCommandList();
} catch (Exception var24) {
this.handleError("Job set up failed " + var24.getCause(), var24);
}
该方法实际上是在构建java执行的命令
protected List<String> getCommandList() {
ArrayList list = new ArrayList();
list.add(this.createCommandLine());
return list;
}
protected String createCommandLine() {
String command = JAVA_COMMAND + " ";
command = command + this.getJVMArguments() + " ";
command = command + "-Xms" + this.getInitialMemorySize() + " ";
command = command + "-Xmx" + this.getMaxMemorySize() + " ";
command = command + "-cp " + this.createArguments(this.getClassPaths(), ":") + " ";
command = command + this.getJavaClass() + " ";
command = command + this.getMainArguments();
return command;
}
其中this.getJavaClass()其实是调用了JavaJob类中的方法
protected String getJavaClass() {
return JavaJobRunnerMain.class.getName();
}
所以正式执行job任务是在JavaJobRunnerMain类的main方法中。
在JavaJobRunnerMain类的构造方法中有如下代码
final String runMethod =
props.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
_logger.info("Invoking method " + runMethod);
实际上是通过反射的方式调用了用户自定义类的run方法,如下
private void runMethod(Object obj, String runMethod)
throws IllegalAccessException, InvocationTargetException,
NoSuchMethodException {
obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
}
这样用户的run方法就可以执行了,该job就可以在azkaban中运行。