《Hadoop-MapReduce源码解析》之二: org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal


1. org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

  /**
   * Internal method for submitting jobs to the system.
   * The job submission process involves:
   *   1. Checking the input and output specifications of the job.
   *   1. 检查作业输入输出规范
   *   2. Computing the InputSplits for the job.
   *   2. 计算作业的输入分片
   *   3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
   *   3. 如有必要,请为作业的分布式缓存设置必要的记帐信息
   *   4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
   *   4. 将作业的jar和配置复制到分布式文件系统上的map reduce系统目录中
   *   5. Submitting the job to the JobTracker and optionally monitoring it's status.
   *   5. 将作业提交给JobTracker,并可选择监视其状态
   * Params:
   *   job – the configuration to submit cluster – the handle to the Cluster
   * Throws:
   *   ClassNotFoundException –
   *   InterruptedException –
   *   IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    // 验证作业的输出规范
    // 通常检查输出路径是否已经存在,当它已经存在时抛出异常,这样输出就不会被覆盖
    checkSpecs(job);

    // 除非明确关闭,否则Hadoop默认指定两个资源,按类路径的顺序加载:
    // core-default.xml:hadoop的只读默认值。
    // core-site.xml:给定hadoop安装的特定于站点的配置。
    Configuration conf = job.getConfiguration();

    // 从MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH("mapreduce.application.framework.path")中
    // 解析路径中的任何符号链接
    // 解析后的uri添加到分布式缓存中:
    // MRJobConfig.CACHE_ARCHIVES = "mapreduce.job.cache.archives";
    // conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
    //       : archives + "," + uri.toString());
    addMRFrameworkToDistributedCache(conf);

    // 获取放置作业特定文件的暂存目录
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      // 设置提交任务的主机名称和地址
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }

    // 这里的submitClient在后续解释,即:LocalJobRunner或者YARNRunner
    // 创建Applicant,生成JobId,返回JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);

    // 提交作业的路径(Path parent, String child),将两个参数拼接为一个新路径
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // 作业状态
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      // 从与传递的路径(作业文件目录)相对应的名称节点获取委派令牌
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      // 从所有NAMENODE节点处获取委派令牌
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      // 获取Shuffle密钥来授权Shuffle转换
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      // 在Hadoop MapReduce中,当进行数据溢出(spill)时,会将部分数据从内存中写入磁盘以释内存间
      // 为保证数据安全,当启用加密中间数据溢出时,最大ApplicationMaster(AM)尝试次数设置为1
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      // 使用命令行选项-libjars、-files、-archives配置用户的jobconf
      // 并上载和配置与传递的作业相关的文件、libjar、jobjar和归档文件
      copyAndConfigureFiles(job, submitJobDir);
      
      // 获取作业conf文件,即:new Path(jobSubmitDir, "job.xml");
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      // 计算任务输入的分片,并返回分片数量,即map任务的数量
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);  
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // 如果计算出来的map数大于设置的或者默认的最大map数,抛出异常
      int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
          MRJobConfig.DEFAULT_JOB_MAX_MAP);
      if (maxMaps >= 0 && maxMaps < maps) {
        throw new IllegalArgumentException("The number of map tasks " + maps +
            " exceeded limit " + maxMaps);
      }

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 将“作业提交到什么队列”写入job文件
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      // 在将jobconf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置;
      // 实际上它们可能会因此而中断,因为引用将指向不同的作业。
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        // 添加DHFS tracking ids:跟踪标识符,该标识符可用于在多个客户端会话中关联令牌的使用情况
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      // reservationId是全局唯一作业的保留标识符,如果作业没有任何关联的保留,则为null
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      // 将submitJobFile写到JobTracker的文件系统中去
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      // 正式提交Job到Yarn或者本地
      status = submitClient.submitJob(  // 具体见Hadoop-MapReduce源码解析》之三
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        // 返回Job提交后的状态
        return status;
      } else {
        // 任务启动失败
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          // 清空暂存目录
          jtFs.delete(submitJobDir, true);

      }
    }
  }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容