JobManager源码分析

JobManager类

JobManager类用于对提交的任务进行管理。

初始化

JobManager初始化的过程主要是从配置文件(server/conf/sqoop.properties)中读取配置信息,
相关的配置信息包括了:任务的提交引擎,任务的执行引擎,以及对任务的提交信息进行更新的时间间隔(默认的时间 为一天 即源代码中的 DEFAULT_PURGE_THRESHOLD)以及更新时间。
之后根据配置信息,实例化提交引擎和执行引擎。以及根据时间间隔 启动submission清理的线程。和更新的线程。
初始化的源代码如下:

public synchronized void initialize() {
    LOG.trace("Begin submission engine manager initialization");
    MapContext context = SqoopConfiguration.getInstance().getContext();

    // Let's load configured submission engine
    String submissionEngineClassName =
      context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);

    submissionEngine = (SubmissionEngine) ClassUtils
      .instantiate(submissionEngineClassName);
    if (submissionEngine == null) {
      throw new SqoopException(DriverError.DRIVER_0001,
        submissionEngineClassName);
    }

    submissionEngine.initialize(context,
        DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);

    // Execution engine
    String executionEngineClassName =
      context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);

    executionEngine = (ExecutionEngine) ClassUtils
      .instantiate(executionEngineClassName);
    if (executionEngine == null) {
      throw new SqoopException(DriverError.DRIVER_0007,
        executionEngineClassName);
    }

    // We need to make sure that user has configured compatible combination of
    // submission engine and execution engine
    if (!submissionEngine
      .isExecutionEngineSupported(executionEngine.getClass())) {
      throw new SqoopException(DriverError.DRIVER_0008);
    }

    executionEngine.initialize(context,
        DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG);

    // Set up worker threads
    purgeThreshold = context.getLong(
      DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
      DEFAULT_PURGE_THRESHOLD
      );
    purgeSleep = context.getLong(
      DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
      DEFAULT_PURGE_SLEEP
      );

    purgeThread = new PurgeThread();
    purgeThread.start();

    updateSleep = context.getLong(
      DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
      DEFAULT_UPDATE_SLEEP
      );

    updateThread = new UpdateThread();
    updateThread.start();

    SqoopConfiguration.getInstance().getProvider()
      .registerListener(new CoreConfigurationListener(this));

    LOG.info("Submission manager initialized: OK");
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,309评论 19 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 177,395评论 25 709
  • 嘿,我们似乎好久没见了,一个月了呢。好想你。
    喵77就是麻朵阅读 269评论 0 0
  • 力的作用是相互的,所以我相信,关于那个夏天,你和我一样,忘不了...... 书上说,太敏感的人不容易幸福,我们都太...
    黄洛晨阅读 282评论 0 1
  • Once, I was somewhere in Burma; at night in thatarea ther...
    第三个选项阅读 568评论 0 0

友情链接更多精彩内容