从源码角度分析MapReduce运作_一.准备阶段

一.目录

本系列文章对Hadoop知识进行复盘。
分为两个阶段,建立连接阶段,提交job阶段。

waitForCompletion()
 
submit();
 
// 1建立连接
   connect();  
      // 1)创建提交Job的代理
      new Cluster(getConfiguration());
         // (1)判断是本地yarn还是远程
         initialize(jobTrackAddr, conf);
 
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
   // 1)创建给集群提交数据的Stag路径
   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
 
   // 2)获取jobid ,并创建Job路径
   JobID jobId = submitClient.getNewJobID();
 
   // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir); 
   rUploader.uploadFiles(job, jobSubmitDir);
 
   // 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
      maps = writeNewSplits(job, jobSubmitDir);
      input.getSplits(job);
 
   // 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
   conf.writeXml(out);
 
   // 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

二.建立连接

客户端提交MR程序后,首先是运行job.waitForCompletion(true),所以从waitForCompletion方法开始分析。

/**
   * Submit the job to the cluster and wait for it to finish. 
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();    //提交作业,重点
    }
    if (verbose) {
      monitorAndPrintJob();  // 监控任务状态
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

进入submit方法

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    // 1.建立连接
    connect(); 
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
     // 2.提交job
        return submitter.submitJobInternal(Job.this, cluster); 
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

进入connect方法

private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     //  1)创建提交job的代理 
                     return new Cluster(getConfiguration()); 
                   }
                 });
    }
  }

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    // 判断是本地还是远程
    initialize(jobTrackAddr, conf); 
  }
  
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            // 如果是远程,则创建yarn代理;如果是本地,则创建local代理
            clientProtocol = provider.create(conf);  
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

三.提交job

接着来看submitJobInternal方法,用来提交作业到集群上,主要是以下几个步骤:

  • 检查作业的输入输出
  • 计算作业的分片
  • 设置job相关的计算信息
  • 复制需要的jar和配置信息到文件系统上
  • 提交作业以及监控其状态
/**
   * Internal method for submitting jobs to the system.
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
    //检查输出路径是否存在,若存在抛出异常
    checkSpecs(job);    

    ...

    // 1)创建给集群提交数据的stage目录
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    ....

    // 2)获取JobID,并创建job路径
    JobID jobId = submitClient.getNewJobID(); 
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());//创建staging目录下的JobID文件夹
    
    ...

    try {
      .....
     
      // 3)拷贝jar包到集群
      copyAndConfigureFiles(job, submitJobDir);
    
      .....

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      // 4)计算Jobmap端的切片,生成切片规划文件
      int maps = writeSplits(job, submitJobDir);  
      conf.setInt(MRJobConfig.NUM_MAPS, maps);

      ...

      // 5)把job的配置信息写入staging+JobID目录下的job.xml
      writeConf(conf, submitJobFile); 
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      // 6)真正开始提交作业,返回提交状态
      status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); 
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)     
          jtFs.delete(submitJobDir, true);//删除staging+JobID目录下所有东西
      }
    }
  }

接着我们看writeSplits方法

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);  // 获取新的切片,重点
    } else {  
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

进入writeNewSplits方法

private <T extends InputSplit> int writeNewSplits(JobContext job, 
Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 
    List<InputSplit> splits = input.getSplits(job); // 获取切片,重点
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

这里我们来看下FileInputFormat对应的getSplits方法

/** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 获取切片大小,重点

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  // 判断文件是否切片,重点
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          ....
    }
    return splits;
  }

这里有两个点需要注意下:

  • 切片的大小,是通过Math.max(minSize, Math.min(maxSize, blockSize))获取。默认minSize的值为1,maxSize的值为Long类型的最大值(即9223372036854775807),blockSize是块大小,故默认切片大小为块大小。
  • 文件是否切片,是通过((double) bytesRemaining)/splitSize > SPLIT_SLOP判断。SPLIT_SLOP的值为1.1,如果剩余文件大小/切片大小>1.1,则切片。
    接着我们回到submitJobInternal方法中,查看submitClient.submitJob动作。submitClient有本地和yarn两种,这里以yarn方式举例。
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    // 把job提交到ResourceManager上
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

最后就是监控任务状态,等待返回任务执行结果,参考monitorAndPrintJob方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352