剖析 azkaban load plugins

插件有很多,azkaban预留的接口多种多样,比如UserManager,Job,LoginAbstractAzkabanServlet等.

首先我要讲的是,jobType插件的加载.jobType只需要由Executor来加载.

executor 配置参数
execution.dir.retention->执行目录保存时间,单位毫秒,默认一天
azkaban.project.dir->工程运行时存放目录
azkaban.jobtype.plugin.dir->jobType插件的目录

public AzkabanExecutorServer(Props props) throws Exception {
    this.props = props;
    server = createJettyServer(props);//创建jetty server

    executionLoader = new JdbcExecutorLoader(props);//executor Dao
    projectLoader = new JdbcProjectLoader(props);//project dao
    runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader());//作业流运行管理器

    JmxJobMBeanManager.getInstance().initialize(props);

    // make sure this happens before
    configureJobCallback(props);

    configureMBeanServer();
    configureMetricReports();

    SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30));

    loadCustomJMXAttributeProcessor(props);

    try {
      server.start();//启动jetty server
    } catch (Exception e) {
      logger.error(e);
      Utils.croak(e.getMessage(), 1);
    }

    insertExecutorEntryIntoDB();//往数据库表进行注册executor
    dumpPortToFile();

    logger.info("Started Executor Server on " + getExecutorHostPort());

    if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) {
      startExecMetrics();
    }
  }

jobType管理是在FlowRunnerManager中进行的,在FlowRunnerManager构造函数中创建了
JobTypeManager.

//JobTypeManager构造函数
 public JobTypeManager(String jobtypePluginDir, Props globalProperties,
      ClassLoader parentClassLoader) {
    this.jobTypePluginDir = jobtypePluginDir;//插件目录,值跟azkaban.properties中azkaban.jobtype.plugin.dir有关
    this.parentLoader = parentClassLoader;
    this.globalProperties = globalProperties;//executor全局属性配置,是executor.global.properties配置

    loadPlugins();//加载插件
  }

  public void loadPlugins() throws JobTypeManagerException {
    JobTypePluginSet plugins = new JobTypePluginSet();

    loadDefaultTypes(plugins);//加载自带插件
    if (jobTypePluginDir != null) {
      File pluginDir = new File(jobTypePluginDir);
      if (pluginDir.exists()) {
        logger
            .info("Job type plugin directory set. Loading extra job types from "
                + pluginDir);
        try {
          loadPluginJobTypes(plugins);//加载第三方插件
        } catch (Exception e) {
          logger.info("Plugin jobtypes failed to load. " + e.getCause(), e);
          throw new JobTypeManagerException(e);
        }
      }
    }

以下是JobTypeManager加载第三方的任务类型

private void loadJobTypes(File pluginDir, JobTypePluginSet plugins)
      throws JobTypeManagerException {
    // Directory is the jobtypeName
    String jobTypeName = pluginDir.getName();//目录名就是任务类型名
    logger.info("Loading plugin " + jobTypeName);

    Props pluginJobProps = null;
    Props pluginLoadProps = null;

    File pluginJobPropsFile = new File(pluginDir, JOBTYPECONFFILE);
    File pluginLoadPropsFile = new File(pluginDir, JOBTYPESYSCONFFILE);

    if (!pluginLoadPropsFile.exists()) {
      logger.info("Plugin load props file " + pluginLoadPropsFile
          + " not found.");
      return;
    }

    try {
      Props commonPluginJobProps = plugins.getCommonPluginJobProps();
      Props commonPluginLoadProps = plugins.getCommonPluginLoadProps();
      if (pluginJobPropsFile.exists()) {
        pluginJobProps = new Props(commonPluginJobProps, pluginJobPropsFile);
      } else {
        pluginJobProps = new Props(commonPluginJobProps);
      }

      pluginLoadProps = new Props(commonPluginLoadProps, pluginLoadPropsFile);
      pluginLoadProps.put("plugin.dir", pluginDir.getAbsolutePath());
      pluginLoadProps = PropsUtils.resolveProps(pluginLoadProps);//替换变量
    } catch (Exception e) {
      logger.error("pluginLoadProps to help with debugging: " + pluginLoadProps);
      throw new JobTypeManagerException("Failed to get jobtype properties"
          + e.getMessage(), e);
    }
    // Add properties into the plugin set
    plugins.addPluginLoadProps(jobTypeName, pluginLoadProps);
    if (pluginJobProps != null) {
      plugins.addPluginJobProps(jobTypeName, pluginJobProps);
    }

    ClassLoader jobTypeLoader =
        loadJobTypeClassLoader(pluginDir, jobTypeName, plugins);
    String jobtypeClass = pluginLoadProps.get("jobtype.class");

    Class<? extends Job> clazz = null;
    try {
      clazz = (Class<? extends Job>) jobTypeLoader.loadClass(jobtypeClass);
      plugins.addPluginClass(jobTypeName, clazz);
    } catch (ClassNotFoundException e) {
      throw new JobTypeManagerException(e);
    }

    logger.info("Verifying job plugin " + jobTypeName);
    try {
      Props fakeSysProps = new Props(pluginLoadProps);
      Props fakeJobProps = new Props(pluginJobProps);
      @SuppressWarnings("unused")
      Job job =
          (Job) Utils.callConstructor(clazz, "dummy", fakeSysProps,
              fakeJobProps, logger);
    } catch (Throwable t) {
      logger.info("Jobtype " + jobTypeName + " failed test!", t);
      throw new JobExecutionException(t);
    }

    logger.info("Loaded jobtype " + jobTypeName + " " + jobtypeClass);
  }

我们可以发现,其中有两种配置,一个common.properties,一个是commonprivate.properties.
commonprivate.properties是用于在executor加载插件的时候所需要配置参数.common.properties是job启动的时候需要通用配置参数.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,957评论 19 139
  • 准备工作 Azkaban Web服务器 azkaban-web-server-2.5.0.tar.gz Azkab...
    小马哥的程序之路阅读 3,636评论 0 3
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,958评论 6 342
  • 盒马购物中心在网上送来5元优惠券,使用规则是购买20元以上生鲜品,期限为三天。 收到优惠券后,我在购物中心买了一公...
    老乐铭阅读 175评论 0 1
  • 前几天,写过一篇文章,谈到了格物、致知。记得上高中的历史书上,写过宋明理学,其中就有格物致知。那会学历史是对...
    2016阅读 181评论 0 0