Job任务提交到执行源码分析(一)

以官方Hadoop中的 WordCount案例分析 ,Job作业的提交过程:

public static void main(String[] args) throws Exception {
          // Create a new Job
          Configuration conf=new Configuration(true);
          Job job = Job.getInstance(conf);
          job.setJarByClass(MyWorkCountJob.class);
         // Specify various job-specific parameters    
          job.setJobName("myWorkCountjob");
          //设置输入文件路径
          FileInputFormat.addInputPath(job, new Path("/user/root/hello.txt"));
          //设置输出文件路径
          Path outPath=new Path("/sxt/mr/output");
          if(FileSystem.get(conf).exists(outPath))
           FileSystem.get(conf).delete(outPath);
          FileOutputFormat.setOutputPath(job, outPath);
         job.setMapperClass(MyMapper.class);
          
          job.setReducerClass(MyReducer.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
          // Submit the job, then poll for progress until the job is complete
          job.waitForCompletion(true);//job 提交的入口
     }

waitForCompletion方法

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();// 任务提交1.1
    }
    if (verbose) {
      monitorAndPrintJob();//实时监控Job任务并打印相关的日志
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis =
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

1.1 submit 方法

public void submit()
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);//确定job状态
    setUseNewAPI();//默认使用新的API
    connect();//获得与集群的连接
    final JobSubmitter submitter =
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException,
      ClassNotFoundException {
            //异步调用submitJobInternal方法提交任务 1.2
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法

1.2 submitJobInternal 方法

JobStatus submitJobInternal(Job job, Cluster cluster)
  throws ClassNotFoundException, InterruptedException, IOException {

    //检查job的输出路径是否存在,如果存在则抛出异常
    checkSpecs(job);
    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

      //初始化临时目录和返回的输出路径。
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
      //获取新的JobId 
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
      // 获取提交目录
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    ......
        //把作业上传到集群中去
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
     
      // 创建切片列表 找出每个文件的切片列表 合并切片列表的数量就是Map任务个数  客户端统计
      int maps = writeSplits(job, submitJobDir); //2.1核心方法
      conf.setInt(MRJobConfig.NUM_MAPS, maps);//文件分片的大小 就是Map任务数量
      ......

      // Write job file to submit dir    相关配置写入到job.xml中
      writeConf(conf, submitJobFile);
     
      // Now, actually submit the job (using the submit name) 真正的提交作业
      status = submitClient.submitJob( //2.3 提交job到RecourceManager
          jobId, submitJobDir.toString(), job.getCredentials());
     ...
  }

2.1 文件切片操作 writeSplits -> writeNewSplits 计算向数据移动模型的核心

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
      //将split信息和SplitMetaInfo都写入HDFS中
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(InputFormat有不同实现类 框架默认的是TextInputFormat)。此时的Input即为TextInputFormat的父类FileInputFormat,其getSplits方法的实现如下:

 public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//默认最小值 1
    long maxSize = getMaxSplitSize(job);//默认最大值 Long类型的最大值

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);//获取源文件的源信息列表
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
            //获取文件的block块列表
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path))
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
        //核心代码块
          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
//核心代码块结束
          if (bytesRemaining != 0) { 
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }

2.2 核心代码块分析
对每个输入文件进行split划分。注意这只是个逻辑的划分 因此执行的是FileInputFormat类中的getSplits方法。只有非压缩的文件和几种特定压缩方式压缩后的文件才分片。分片的大小由如下几个参数决定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、文件的blocksize大小确定。
具体计算方式为:
Math.max(minSize, Math.min(maxSize, blockSize))
分片的大小有可能比默认块大小64M要大,当然也有可能小于它,默认情况下分片大小为当前HDFS的块大小,64M

第一步 将bytesRemaining(剩余未分片字节数)初始化设置为整个文件的长度
第二步 如果bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit,SPLIT_SLOP(默认1.1)。接着就会执行如下方法获取block的索引,其中第二个参数是这个block在整个文件中的偏移量

protected int getBlockIndex(BlockLocation[] blkLocations,
                              long offset) {
    for (int i = 0 ; i < blkLocations.length; i++) {
      // is the offset inside this block? 核心代码块 判断当前的偏移量是否在某个block中 是就返回当前index 位置信息
      if ((blkLocations[i].getOffset() <= offset) &&
          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
        return i;
      }
    }
    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    throw new IllegalArgumentException("Offset " + offset +
                                       " is outside of file (0.." +
                                       fileLength + ")");
  }

第三步 将符合条件的块的索引对应的block信息的主机节点以及文件的路径名、开始的偏移量、分片大小splitSize封装到一个InputSplit中加入List<InputSplit> splits 列表。

第四步 bytesRemaining -= splitSize修改剩余字节大小 循环以上操作 直到不满足条件 剩余bytesRemaining还不为0,表示还有未分配的数据,将剩余的数据及最后一个block加入splits列表

以上是 整个getSplits获取切片的过程。当使用基于FileInputFormat实现InputFormat时,为了提高MapTask的数据本地化,应尽量使InputSplit大小与block大小相同

2.3 submitter 实现了ClientProtocol接口的类 在1.1中connect()连接集群时 调用init初始化方法 由框架读取 HDFS的配置文件中配置了mapreduce.framework.name属性为“yarn”的话,会创建一个YARNRunner对象 submitter 就是YARNRunner 对象
submitter.submitJobInternal(Job.this, cluster)

YARNRunner的构造方法:

public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
     ClientCache clientCache) {
   this.conf = conf;
   try {
     this.resMgrDelegate = resMgrDelegate;
     this.clientCache = clientCache;
     this.defaultFileContext = FileContext.getFileContext(this.conf);
   } catch (UnsupportedFileSystemException ufe) {
     throw new RuntimeException("Error in instantiating YarnClient", ufe);
   }
 }

ResourceMgrDelegate实际上ResourceManager的代理类,其实现了YarnClient接口,通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查询Job运行状态等操作。
YarnRunner 类的submitJob方法

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    addHistoryToken(ts);
    // Construct necessary information to start the MR AM
       //Client构造ASC。ASC中包括了调度队列,优先级,用户认证信息,除了这些基本的信息之外,还包括用来启动AM的CLC信息,一个CLC中包括jar包、依赖文件、安全token,以及运行任务过程中需要的其他文件
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);
    // Submit to ResourceManager
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext); // 2.4 提交ASC到RecoureManeger 

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

2.4 到这里一个Client就完成了一次Job任务的提交

2.5 YARN 框架 统一的资源管理 任务调度

yarn.png

相关的角色
**ResourceManager **
集群节点资源的统一管理

**NodeManager ** 每个DN上都会对应一个NM进程

  • 与RM汇报资源的使用情况
  • 管理运行的Container生命周期
    Container:【节点NM上CPU,MEM,I/O大小等资源的虚拟描述】

MR-ApplicationMaster-Container
每个Job作业对应一个AM,避免单点故障,负载到不同的节点
创建Task时需要和RM申请资源(Container),然后向存放具体资源的DN通信,由DN创建Container并且启动进程同时下发任务(这里就实现了计算向数据移动

Task-Container 任务执行进程
DN上执行的JVM进程,接收到AM下发的任务后,通过反射机制创建具体的任务对象后 执行具体的任务

** 执行流程**
1 RM 在空闲的DN 上启动AM
2 AM向RM申请资源 ,RM将资源分配信息给AM
3 AM在和数据所在的NM节点通信,创建Container并且通知NM启动Container(JVM进程),分发具体任务到NM上,Container通过反射调起具体的任务类执行
4 如果是MapReduce框架 则进入到MapTask流程 具体分析见 http://liujiacai.net/blog/2014/09/07/yarn-intro/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容