MapReduce

提交

提交源码

//true代表打印执行过程
boolean b = job.waitForCompletion(true);

Job.java

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {
//Job现在状态还是DEFINE,会执行submit
    if (state == JobState.DEFINE) {
      submit();
    if (verbose) {
//监控打印job的信息,执行完临时目录的内容就删掉了
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
    }
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    //确认状态
    ensureState(JobState.DEFINE);
    //设置新旧api兼容
    setUseNewAPI();
    //设置连接,如果没设置用户,doAs设置当前主机为登录用户
    connect();
    //设置提交job的fs,客户端,状态等信息
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(),          cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
       //墨迹半天终于提交了
        return       submitter.submitJobInternal(Job.this, cluster);
    //提交流程结束,JobState改变为running
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());

提交进JobSubmitter.java

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //检查输出路径 
    checkSpecs(job);
    //进去看一眼,全是site.xml,存的都是配置文件
    Configuration conf = job.getConfiguration();
//字面意思,添加MR框架去分布式缓存
//大概就是把配置文件发给每个maptask和//reducetask    addMRFrameworkToDistributedCache(conf);
    //会生成一个临时目录,可为什么在d盘啊
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    //生成了唯一标识jobid
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
//在刚才生成的临时目录下定义路径,准备生成
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
   //配置suffleid/qeque什么的
   ...
   //这个方法一执行,刚才的定义的路径就生成文件夹了
  //进到JobSubmitter里了,执行rUploader.uploadResources
   //然后进JobResourceUploader里,mkdir生成了文件夹
  //uploadJobJar,集群模式没jar,要提交到集群,所以先提交到文件夹里
   copyAndConfigureFiles(job, submitJobDir);
   //路径是文件夹路径/job.xml
   Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
     //切片源码,执行完切片文件放入刚才文件夹
      int maps = writeSplits(job, submitJobDir);
     //设置切片个数,切片赋给MRJobConfig.NUM_MAPS了
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
     ...
  //把xml放进来了,都是配置信息
   writeConf(conf, submitJobFile);
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
    //设置job为的status变为running
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());

切片

切片源码

JobSubmitter.java

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    //jobSubmitDir 存储路径(就是生成那个临时文件夹)
    //jConf是job的信息,id什么的
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //hadoop3.x的切片方式,我就进入这个方法。
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
   //hadoop2.x的切片方式
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
 //hadoop3.x具体切片
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   //切片了!!!!!!!!!!!
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

InputFormat有很多实现类。默认进入FileInputFormat.java实现类
FileInputFormat.java

 public static long getMaxSplitSize(JobContext context) {
   //SPLIT_MAXSIZE不设置没有
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
   //切片最小的size,getFormatMinSplitSize()返回1
   //getMinSplitSize()设置在yarn-default里,默认值是0
   //mapreduce.input.fileinputformat.split.minsize
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //切片最大的size
    long maxSize = getMaxSplitSize(job);
    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
   ...
    循环遍历文件列表进行切片,说明是按照文件进行切分
    for (FileStatus file: files) {
             if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //文件是否可以切割
        if (isSplitable(job, path)) {
          //获取块大小
          long blockSize = file.getBlockSize();
           //获取切片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
          //一个文件的大小
          long bytesRemaining = length;
          //SPLIT_SLOP=1.1 
          //如果文件大于切片大小的1.1倍就切片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          //感觉就是循环切完片了,如果还剩下点,就放在最后一片
            if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
      }else { 
          // not splitable
      }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
  }
图片.png

图片.png

图片.png

提交


图片.png

切片
图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容