通过MapReduce的本地运行方式来分析MapReduce的执行过程

一、首先抛出结论(Map阶段)

    1. Job.afterCompletion():
        检查是否是running状态,如果是running避免重复提交!
        如果状态是define,提交!执行commit()
                    
    2. commit() :  
        创建Cluster对象,是Job运行的集群的抽象表达,包含JobRunner,及文件系统!
        根据Cluster获取Jobcommitter,提交Job
                    
    3. 提交前:
        ①确定当前Job的作业目录
        ②切片,将split.info / split.infometa在作业目录生成
        ③根据切片数,设置MapTask启动个数
        ④生成Job.xml文件(包含了所有的配置信息参数)
                    
    4. 提交: 
        在LocalJobRunner上重构Job对象!
        执行start(),启动一个线程!
                    
    5. Job的run()
                
        ①根据切片信息,获取包含切面及属性信息的数组,根据这个数组,确定List<RunableandThrowable>  mapTaskRunables
                            
        ②根据设置的numReduceTasks(默认为1),确定List<RunableandThrowable>  reduceeTaskRunables
                    
        ③创建线程池,开启多个线程,运行MapTask和ReduceTask
                    
    6. 每个MapTaskRunable对象,都会创建一个MapTask
        MapTask执行runNewMapper()执行Map阶段的核心逻辑!
                    
    7. 每个ReduceTaskRunable对象,都会创建一个ReduceTask
        ReduceTask执行runNewReducer()执行Rudece阶段的核心逻辑!

二、各阶段源码

  1. Job.waitForCompletion
public boolean waitForCompletion(boolean verbose) throws IOException, 
                        InterruptedException, ClassNotFoundException {
   // 判断job是定义状态,避免job提交后正在运行造成的重复提交
   if (state == JobState.DEFINE) {
       //此方法的核心在于submit()
       submit();
   }
   if (verbose) {        // 根据传入参数决定是否将job运行的过程打印显示
       monitorAndPrintJob();
   } else {
      // get the completion poll interval from the client.
       int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
       while (!isComplete()) {
           try {
               Thread.sleep(completionPollIntervalMillis);
           } catch (InterruptedException ie) {
           }
       }
    }
    return isSuccessful();
}

  1. submit()
    2.1 submit方法
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    //确认运行状态
    ensureState(JobState.DEFINE);
    //确定是否是使用的新API
    setUseNewAPI();
    // 创建Cluster对象,Cluster是集群的抽象表达,包含JobRunner,和文件系统
    connect();

    // 根据Cluster创建Job提交器
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
    // 使用Job提交器,提交Job,获取Job运行状态
    //ugi是UserGroupInformation类的实例,表示Hadoop中的用户和组信息,这个类包装了一个JAAS Subject以及提供了
    //方法来确定用户的名字和组,它同时支持Windows、Unix和Kerberos登录模块
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
            return submitter.submitJobInternal(Job.this, cluster);
        }
    });

    // 将状态改为正在运行
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
}

2.2. setUseNewApi()

private void setUseNewAPI() throws IOException {
    // reduceTask的个数,通过mapreduce.job.reduces设置,默认是1
    int numReduces = conf.getNumReduceTasks();
    ......//后面是一系列的检查语句,此处不作具体解释
}
  1. Jobsubmitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    // 获取输出格式,调用方法,检查输入目录是否设置且不存在!
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    //将conf加入分布式缓存中
    addMRFrameworkToDistributedCache(conf);
    // 生成Job运行期间临时作业目录
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
        //得到本机的提交地址
        submitHostAddress = ip.getHostAddress();
        //得到本机的主机名字
        submitHostName = ip.getHostName();
        //获取之后,在配置文件中进行submitHostName和submitHostAddress的设置
        conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
        conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    // 生成Job的id
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    // 根据Jobid,在Job作业目录,创建一个子目录
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try{
        ......//此段内容为权限检查

        // 生成当前Job的作业目录
        copyAndConfigureFiles(job, submitJobDir);
        // 获取当前Job总的配置文件Job.xml(包含8个配置文件中所有的信息)的路径
        Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          
        // Create the splits for the job
        LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

        // 生成当前Job的切片信息: Job.split(切片信息),job.splitinfo(切片的属性)
        // Job.split保存了有多少个切片对象,以及每个切片是从哪个文件切的哪部分
        // job.splitinfo记录的是每一个切片,应该去哪个主机来读取,块信息的DN主机
        int maps = writeSplits(job, submitJobDir);

        // 根据切片数,设置应该启动几个MapTask
        conf.setInt(MRJobConfig.NUM_MAPS, maps);
        LOG.info("number of splits:" + maps);

        ......//此段内容为权限检查
        // Write job file to submit dir
        // 将当前Job的配置信息生成到作业目录的job.xml中
        writeConf(conf, submitJobFile);
        
        // Now, actually submit the job (using the submit name)
        printTokens(jobId, job.getCredentials());

        // 正式提交
        status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

        if (status != null) {
            return status;
        } else {
            throw new IOException("Could not launch job");
        }
    }finally {
        if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
                jtFs.delete(submitJobDir, true);
        }
    }
}

4.1 提交Job
submitJob()方法是接口 ClientProtocol(RPC 协议)中的一个抽象方法。根据 RPC 原理,在【客户端代理对象submitClient】调用RPC协议中的submitJob()方法,此方法一定在服务端执行。该方法也有两种实现: LocalJobRunner(本地模式)和 YARNRunner(YARN模式)

//本地模式的Job提交方式,
public org.apache.hadoop.mapreduce.JobStatus submitJob(
      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
      Credentials credentials) throws IOException {
    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);   //跳至第4.2阶段
    job.job.setCredentials(credentials);
    return job.status;
}

4.2 New Job

//重构Job对象!
public Job(JobID jobid, String jobSubmitDir) throws IOException {
     …… //各种设置,重构在LocalJobRunner上运行的Job
     // 开启一个线程
     this.start();
}
  1. Job.run()
//运行Job
public void run() {
    JobID jobId = profile.getJobID();
    JobContext jContext = new JobContextImpl(job, jobId);
    
    org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
    try {
        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
    } catch (Exception e) {
        LOG.info("Failed to createOutputCommitter", e);
        return;
    }
    
    try {
        // 根据切片信息,创建所有的切片及切片属性对象
        TaskSplitMetaInfo[] taskSplitMetaInfos = 
        SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

        int numReduceTasks = job.getNumReduceTasks();
        outputCommitter.setupJob(jContext);
        status.setSetupProgress(1.0f);
        // 使用一个Map记录每个MapTask最终保存文件的信息
        Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
        //创建MapTask运行的线程集合,有几片就启动几个线程
        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);
              
        initCounters(mapRunnables.size(), numReduceTasks);

        // 创建线程池
        ExecutorService mapService = createMapExecutor();
        // 开启Map阶段线程的运行
        //注意:mapreduce的运行过程中,使用了线程池的技术(放到队列当中,在将来的某个时刻进行执行)
        runTasks(mapRunnables, mapService, "map");

        try {
            if (numReduceTasks > 0) {
              //计算reduce对应的runnable个数
              List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
                  jobId, mapOutputFiles);
               //开启线程池
              ExecutorService reduceService = createReduceExecutor();
              //开启Reduce阶段线程的运行,此篇文章不讲解此阶段,后面新开文章讲解此阶段内容
              runTasks(reduceRunnables, reduceService, "reduce");
            }
        } finally {
            for (MapOutputFile output : mapOutputFiles.values()) {
              output.removeAll();
            }
        }
        // delete the temporary directory in output directory
        outputCommitter.commitJob(jContext);
        status.setCleanupProgress(1.0f);

        if (killed) {
            this.status.setRunState(JobStatus.KILLED);
        } else {
            this.status.setRunState(JobStatus.SUCCEEDED);
        }
        JobEndNotifier.localRunnerNotification(job, status);
    } catch (Throwable t) {
        try {
            outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
        } catch (IOException ioe) {
            LOG.info("Error cleaning up job:" + id);
        }
        status.setCleanupProgress(1.0f);
        if (killed) {
            this.status.setRunState(JobStatus.KILLED);
        } else {
            this.status.setRunState(JobStatus.FAILED);
        }
        LOG.warn(id, t);

        JobEndNotifier.localRunnerNotification(job, status);

    } finally {
        try {
            fs.delete(systemJobFile.getParent(), true);  // delete submit dir
            localFs.delete(localJobFile, true);              // delete local copy
            // Cleanup distributed cache
            localDistributedCacheManager.close();
        } catch (IOException e) {
            LOG.warn("Error cleaning up "+id+": "+e);
        }
    }
}

6.Job类中的runTasks()方法

private void runTasks(List<RunnableWithThrowable> runnables,
    ExecutorService service, String taskType) throws Exception {
    // Start populating the executor with work units.
    // They may begin running immediately (in other threads).
    for (Runnable r : runnables) {
        //进行提交  是一个线程池,执行map或者reduce
        service.submit(r);
    }
    ...
}
  1. MapTaskRunable的run()
public void run() {
    try {
        // 生成当前线程的id
        TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);
        LOG.info("Starting task: " + mapId);
        mapIds.add(mapId);

        // 创建MapTask对象,这个对象负责当前线程(节点)map阶段逻辑的执行!
        MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);
        map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());

        //设置目录
        //map为MapTask类型的一个值,例如本次调试中所获取的值为:attempt_localXXX_0001_m_000000_0
        //localConf()加载配置文件的信息
        setupChildMapredLocalDirs(map, localConf);
        
        //创建一个map输出文件并设置配置信息
        // 当前MapTask生成的数据保存的对象
        MapOutputFile mapOutput = new MROutputFiles();
        mapOutput.setConf(localConf);
        mapOutputFiles.put(mapId, mapOutput);

        //指的是一个job_localXXX_0001.xml文件
        map.setJobFile(localJobFile.toString());
        localConf.setUser(map.getUser());
        map.localizeConfiguration(localConf);
        map.setConf(localConf);
        try {
            map_tasks.getAndIncrement();
            //launchMap()方法,进行启动map
            myMetrics.launchMap(mapId);
            map.run(localConf, Job.this);     //此处将调用Maptask的run方法
            myMetrics.completeMap(mapId);
        } finally {
            map_tasks.getAndDecrement();
        }

        LOG.info("Finishing task: " + mapId);
    } catch (Throwable e) {
      this.storedException = e;
    }
}
  1. MapTask.run()
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;

    if (isMapTask()) {
        // If there are no reducers then there won't be any sort. Hence the map 
        // phase will govern the entire attempt's progress.
        if (conf.getNumReduceTasks() == 0) {
            mapPhase = getProgress().addPhase("map", 1.0f);
        } else {
            // 如果需要对key-value进行排序,那么必须有reduce阶段!
            // If there are reducers then the entire attempt's progress will be 
            // split between the map phase (67%) and the sort phase (33%).
            mapPhase = getProgress().addPhase("map", 0.667f);
            sortPhase  = getProgress().addPhase("sort", 0.333f);
        }
    }
    TaskReporter reporter = startReporter(umbilical);

    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
        runJobCleanupTask(umbilical, reporter);
        return;
    }
    if (jobSetup) {
        runJobSetupTask(umbilical, reporter);
        return;
    }
    if (taskCleanup) {
        runTaskCleanupTask(umbilical, reporter);
        return;
    }

    if (useNewApi) {
        //此处进入Map的核心处理阶段的入口
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
}

9*. MapTask核心逻辑 RunNewMapper()

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
    // make a task context so we can get the classes
    // 创建配置的上下文
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper  实例化Mapper对象
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format  实例化InputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split  //重建切片 切片包含了当前片所属的文件及哪个部分
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
        // 负责直接将MapTask输出的结果输出到最终的输出目录
        output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        // 负责将MapTask产生的结果进行收集,交给ReduceTask
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
        input.initialize(split, mapperContext);
        //进入之后运行用户自定义的mapper示例
        mapper.run(mapperContext);
        mapPhase.complete();
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
        input.close();
        input = null;
        output.close(mapperContext);
        output = null;
    } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);
    }
}

10.Mapper.run()

//运行自定义的Mapper!
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        while (context.nextKeyValue()) {
            //在此处开始运行自己写的mapper程序
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        cleanup(context);
    }
}

参考文献:
MapReduce Job本地提交过程源码跟踪及分析https://blog.csdn.net/lemonZhaoTao/article/details/72943618

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容

  • 勒皮他人的外貌十分奇特,衣着也富有特色。仆人们都拿着一根系着一个充气的气囊的短棒,里面装有干碗豆和小石头以提醒沉浸...
    陆辰熠阅读 354评论 0 0
  • 醒来,已是早晨8点,在年末的最后一天睡了个懒觉,查看手机今日天气晴朗,空气良好。写下今日三件事,准备出发了。 上午...
    senny1978阅读 252评论 2 2
  • 时至午夜难入眠 不知谁家鼾声传 羡慕人家心态安 笑我人生看不穿 (可叹,可叹)
    秋心姐有故事阅读 729评论 8 39