五、Azkaban源码总体分析

一、概述

Azkaban的执行代码主要分为四大模块:

(1)azkaban-webserver:主要提供web界面展示和接收http请求的模块
(2)azkaban-execserver:具体执行任务的模块
(3)azkaban-common:公共模块,提供访问数据库,告警等公共类
(4)azkaban-plugins:插件模块,主要包含hive、spark、mr、java等模块

下面分别讲解各个模块的代码结构

二、azkaban-webserver

入口类是AzkabanWebServer 的 main方法。以下是该类的主要代码逻辑

1、加载 conf 目录下的 azkaban.properties 和 azkaban.private.properties 两个文件,获取用户配置的系统参数

Props azkabanSettings = AzkabanServer.loadProps(args);

2、构建Server服务对象,Azkaban的web服务由Jetty的提供的

final Server server = new Server();

3、是否采用ssl,由配置参数jetty.use.ssl的值确定,如果为true,则访问的时候需要用https,否则默认为http服务。此处我们设置jetty.use.ssl=false

if (azkabanSettings.getBoolean("jetty.use.ssl", true)) {
      //配置ssl服务
      server.addConnector(secureConnector);
} else {
      //获取http端口      
      port = azkabanSettings.getInt("jetty.port", DEFAULT_PORT_NUMBER);
      SocketConnector connector = new SocketConnector();
      connector.setPort(port);
      connector.setHeaderBufferSize(MAX_HEADER_BUFFER_SIZE);
      server.addConnector(connector);
}

4、构建Context

Context root = new Context(server, "/", Context.SESSIONS);

5、配置静态资源路径

ServletHolder staticServlet = new ServletHolder(new DefaultServlet());
root.addServlet(staticServlet, "/css/*");
root.addServlet(staticServlet, "/js/*");
root.addServlet(staticServlet, "/images/*");
root.addServlet(staticServlet, "/fonts/*");
root.addServlet(staticServlet, "/favicon.ico");

6、配置动态请求路径对应的 Servlet,每种请求类型都由对应的servlet处理

root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager");
root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
root.addServlet(new ServletHolder(new HistoryServlet()), "/history");
root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");
root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers");
root.addServlet(new ServletHolder(new StatsServlet()), "/stats");

7、启动服务

try {
      server.start();
} catch (Exception e) {
      logger.warn(e);
      Utils.croak(e.getMessage(), 1);
}

8、最后定义了一个ShutdownHook,用于在进程挂掉的时候打印当时的进程资源利用情况

Runtime.getRuntime().addShutdownHook(new Thread() {
  ...
}

每个处理请求的Servlet都有对应的两个方法,分别用来处理GET和POST请求

protected void handleGet(HttpServletRequest req, HttpServletResponse resp,
      Session session) throws ServletException, IOException {
}

protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
      Session session) throws ServletException, IOException {
}

每个Servlet根据不同的参数值,调用具体的方法进行处理。

我们来看一个具体的接口调用过程:http://hostname:8081/executor?ajax=executeFlow&project=testProject&flow=testFlow

根据AzkabanWebServer中定义的请求处理路径,/executor 请求将会分发到ExecutorServlet类中处理,进入该类,在类的一开始就定义了一些Manager类:

private ProjectManager projectManager;
private ExecutorManagerAdapter executorManager;
private ScheduleManager scheduleManager;

这些 Manager 类就是具体处理数据的业务类。

由于以上的请求是GET请求,我们从handleGet方法开始跟踪

if (hasParam(req, "ajax")) {
      handleAJAXAction(req, resp, session);
} else if (hasParam(req, "execid")) {
    ... ...
}

由于我们存在ajax参数,所以进入handleAJAXAction方法,然后我们的ajax参数值是executeFlow,所以进入以下方法

ajaxAttemptExecuteFlow(req, resp, ret, session.getUser());

该方法先检查project和flow参数值是否正确,然后调用

ajaxExecuteFlow(req, resp, ret, user);

进行任务的提交,azkaban-webserver的主要逻辑大体都是这样。

三、azkaban-common

azkaban是典型的JavaWeb系统,如果说azkaban-webserver是表现层的话,那azkaban-common就是业务层和持久层,业务层逻辑封装在各个XXXManager中,持久层就是JdbcExecutorLoader类。我们接着分析以上的请求。

executorManager.submitExecutableFlow调用了azkaban-common模块的ExecutorManager类下的submitExecutableFlow方法,该方法封装exflow变量的各种属性值后,调用以下代码进行入库

executorLoader.uploadExecutableFlow(exflow);

同时调用dispatch方法从后台发送请求到azkaban-execserver模块进行具体任务的执行

dispatch(reference, exflow, choosenExecutor);

我们先看入库功能,点击进入executorLoader.uploadExecutableFlow方法,就是在JdbcExecutorLoader类中,azkaban用了JDBC接口与mysql交互,只是做了简单的封装,对外提供一个QueryRunner类进行数据库操作,代码如下

private synchronized void uploadExecutableFlow(Connection connection,
                                                 ExecutableFlow flow, EncodingType encType)
          throws ExecutorManagerException, IOException {
    //需要执行的sql
    final String INSERT_EXECUTABLE_FLOW =
            "INSERT INTO execution_flows "
                    + "(project_id, flow_id, version, status, submit_time, submit_user, update_time, origin_exec_id) "
                    + "values (?,?,?,?,?,?,?,?)";
    //获取执行对象,这里其实就是DataSource
    QueryRunner runner = new QueryRunner();
    long submitTime = System.currentTimeMillis();

    long id;
    try {
      flow.setStatus(Status.PREPARING);
      //执行sql
      runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(),
              flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(),
              submitTime, flow.getSubmitUser(), submitTime, flow.getOriginExecId());
      connection.commit();
    //获取生成的最新id,作为execid
    id =
              runner.query(connection, LastInsertID.LAST_INSERT_ID,
                      new LastInsertID());

      if (id == -1L) {
        throw new ExecutorManagerException(
                "Execution id is not properly created.");
      }
      logger.info("Flow given " + flow.getFlowId() + " given id " + id);
      flow.setExecutionId((int) id);
      //更新exec_id字段为新生成的id值
      updateExecutableFlow(connection, flow, encType);

    } catch (SQLException e) {
      throw new ExecutorManagerException("Error creating execution.", e);
    }
  }

至此,入库就完成了。我们再回过头来看 dispatch(reference, exflow, choosenExecutor)方法。一步步跟踪下去后发现,其最终是发起了一个http请求,代码如下

private String callExecutorForJsonString(String host, int port, String path,
    List<Pair<String, String>> paramList) throws IOException {
    if (paramList == null) {
      paramList = new ArrayList<Pair<String, String>>();
    }

    ExecutorApiClient apiclient = ExecutorApiClient.getInstance();
    @SuppressWarnings("unchecked")
    #封装http请求,请求路径指向了azkaban-execserver模块
    URI uri =
      ExecutorApiClient.buildUri(host, port, path, true,
        paramList.toArray(new Pair[0]));
    #发起get请求,并获得返回值
    return apiclient.httpGet(uri, null);
  }

这里发起的http请求最后由azkaban-execserver模块的ExecutorServlet模块接收。下面继续分析azkaban-execserver模块。

四、azkaban-execserver

该模块的入口类是AzkabanExecutorServer,其main方法也像web模块一样,封装了各个请求路径的处理servlet,如下

Context root = new Context(server, "/", Context.SESSIONS);
    root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE);

    root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor");
    root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx");
    root.addServlet(new ServletHolder(new StatsServlet()), "/stats");
    root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");

所以我们在common模块看到的代码callExecutorForJsonObject(host, port, "/executor", paramList);其实就是请求了这里的ExecutorServlet类进行处理。

由于是GET请求,我们进入ExecutorServlet类的doGet方法;由于common中封装的参数是EXECUTE_ACTION,所以我们进入以下代码

else if (action.equals(EXECUTE_ACTION)) {
    handleAjaxExecute(req, respMap, execid);
} 

在handleAjaxExecute类中调用submitFlow方法

flowRunnerManager.submitFlow(execId, lastExecId);

在submitFlow方法中构造了FlowRunner对象

FlowRunner runner =
        new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);

并提交到ThreadPool中执行。在FlowRunner的run方法中,其实是构造了一个JobRunner对象

JobRunner jobRunner =
        new JobRunner(node, path.getParentFile(), executorLoader,
            jobtypeManager);

并提交到线程池中执行

JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
  //提交执行
  executorService.submit(runner);
  activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
  logger.error(e);
}

在JobRunner的run方法中才真正运行job任务。该方法调用prepareJob进行一些初始化工作,主要是封装job类

try {
    job = jobtypeManager.buildJobExecutor(this.jobId, props, logger);
} catch (JobTypeManagerException e) {
    logger.error("Failed to build job type", e);
    return false;
}

在buildJobExecutor获取任务的type类型,如command、hive、java、HadoopJava等,其实就是azkaban的jobtypes下对应的几个文件夹名称

jobtypes目录

buildJobExecutor方法中主要根据配置的type对于的class路径,通过反射的方式获取对应的实例

public Job buildJobExecutor(String jobId, Props jobProps, Logger logger)
      throws JobTypeManagerException {
    //获取type对应的class路径
    Job job = null;
    try {
      String jobType = jobProps.getString("type");
      ... ...
      Class<? extends Object> executorClass = pluginSet.getPluginClass(jobType);
      if (executorClass == null) {
        throw new JobExecutionException(String.format("Job type '" + jobType
            + "' is unrecognized. Could not construct job[%s] of type[%s].",
            jobProps, jobType));
      }

      ... ...
      job =
          (Job) Utils.callConstructor(executorClass, jobId, pluginLoadProps,
              jobProps, logger);
    } catch (Exception e) {
      logger.error("Failed to build job executor for job " + jobId
          + e.getMessage());
      throw new JobTypeManagerException("Failed to build job executor for job "
          + jobId, e);
    }
    return job;
  }

以上的jobType就是azkaban的plugins/jobtypes下的每个子目录中的private.properties文件,如hadoopJava的private.properties文件内容是:

jobtype.class=azkaban.jobtype.HadoopJavaJob

该类在 azkaban-jobtype-x.x.x.jar包里,对应的项目是azkaban-plugins

五、azkaban-plugins

这里介绍azkaban-plugins模块的代码,举例的类型为java。我们来回顾Java类型的任务流编写流程

JavaMain.java

package com.dataeye.java;

import org.apache.log4j.Logger;

import azkaban.utils.Props;

public class JavaMain {

    private static final Logger logger = Logger.getLogger(JavaMain.class);

    private final int fileRows;
    private final int fileLine;
    
    public JavaMain(String name, Props props) throws Exception {
        this.fileRows = props.getInt("file.rows");
        this.fileLine = props.getInt("file.line");
    }
    
    public void run() throws Exception {
        logger.info(" ### this is JavaMain method ###");
        logger.info("fileRows value is ==> " + fileRows);
        logger.info("fileLine value is ==> " + fileLine);
    }
    
}

java.job

type=java
job.class=com.dataeye.java.JavaMain
classpath=lib/*
file.rows=10
file.line=50

根据上面的源码分析可知java类型的工作流任务初始化类在plugins/jobtypes/java目录下的private.properties文件,内容如下

jobtype.class=azkaban.jobtype.JavaJob

所以我们的入口类就是azkaban-plugins模块的JavaJob类,入口方法是run()方法。

在父类的run方法中调用了JavaProcessJob类的getCommandList()

try {
    var27 = this.getCommandList();
} catch (Exception var24) {
    this.handleError("Job set up failed " + var24.getCause(), var24);
}

该方法实际上是在构建java执行的命令

protected List<String> getCommandList() {
    ArrayList list = new ArrayList();
    list.add(this.createCommandLine());
    return list;
}

protected String createCommandLine() {
    String command = JAVA_COMMAND + " ";
    command = command + this.getJVMArguments() + " ";
    command = command + "-Xms" + this.getInitialMemorySize() + " ";
    command = command + "-Xmx" + this.getMaxMemorySize() + " ";
    command = command + "-cp " + this.createArguments(this.getClassPaths(), ":") + " ";
    command = command + this.getJavaClass() + " ";
    command = command + this.getMainArguments();
    return command;
}

其中this.getJavaClass()其实是调用了JavaJob类中的方法

protected String getJavaClass() {
    return JavaJobRunnerMain.class.getName();
}

所以正式执行job任务是在JavaJobRunnerMain类的main方法中。

在JavaJobRunnerMain类的构造方法中有如下代码

final String runMethod =
    props.getProperty(RUN_METHOD_PARAM, DEFAULT_RUN_METHOD);
_logger.info("Invoking method " + runMethod);

实际上是通过反射的方式调用了用户自定义类的run方法,如下

private void runMethod(Object obj, String runMethod)
      throws IllegalAccessException, InvocationTargetException,
      NoSuchMethodException {
    obj.getClass().getMethod(runMethod, new Class<?>[] {}).invoke(obj);
 }

这样用户的run方法就可以执行了,该job就可以在azkaban中运行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,797评论 6 342
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,608评论 18 399
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,230评论 11 349
  • 莎士比亚说:“怀疑,是大家必须通过的大门口。 只有通过这个大门口,才能进入真理的殿堂。” 所...
    王奇才阅读 447评论 0 2