MapReduce 原理和源码-实战

Local模式下的 MapReduce 计算步骤(图解)

步骤详解

  1. "main"线程中完成input切片和Job提交
  • 本地构建submitJobDir临时目录
  • 根据InputPath文件数和blockSize大小进行分片: InputSplit[]
  • 将分片信息和作业配置一起写入到submitJobDir中;
  • 将submitJobDir下所有上传(本地告知)MR作业Job,完成提交;
  1. "?"线程完成作业信息的接收和解析
  • 处理submitJobDir提交上来的信息;
  • 根据conf信息创建相关类
  1. "Thread-3"的Job.run()线程执行Job的运算
  • 根据分片构建Map任务的List<RunnableWithThrowable>
  • 遍历提交每个MapTask线程任务;
    • 下接: "LocalJobRunner Map Task"线程的 JobRunner.MapTaskRunnable.run()
  • 根据job.reduces数量构建Reduce的线程任务reduceRunnables: List<RunnableWithThrowable>
  • 遍历并执行每个ReduceTask线程任务;
    • 下接: "pool-n-thread-1"线程中的 ReduceTaskRunnable.run()方法;
  1. "LocalJobRunner Map Task"线程: 完成一个分配Map计算;
  • 获取map对应分片信息和数据,
  • 构建MapTask任务
  • input.init加载分片输入数据;
  • mapper.run() 循环调用User定义的map()方法完成计算;
  • 将Map结果写入中间shuffle文件并告知Tracker;
  1. "pool-3-thread-1"线程完成一次Reduce计算
  • 构建ReduceTask任务;
  • 构建shuffleContext 洗牌任务并执行shuffle.run()洗牌;
  • 基于shuffle后的结果数据, 封装进KeyValueIterator迭代器中, 并执行reducer.reduce()聚合计算;
  • 将reduce计算结果写入保存到OutputPath路径上;
  • 结束Job并释放/消耗相应资源;

"main"线程切片和提交

JobSubmitter.submitJobInternal源码.png

构建上传目录submitJobDir的原理源码

源码详解:

JobSubmitter.submitJobInternal(){
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);{
        client.getStagingAreaDir();{
            Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, "/tmp/hadoop/mapred/staging"));
            user = ugi.getShortUserName() + rand.nextInt(Integer.MAX_VALUE);//username-randid;
            return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
        }
    }
    JobID jobId = submitClient.getNewJobID();{//LocalJobRunner.getNewJobID(): 以local+上面rand.nextInt(MAX)生成的随机数+0/n, 作为jobId;
        return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
    }
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
}
LocalJobRunner.getStagingAreaDir()方法.png
生成submitJobDir的构成.png

1.2 计算并写入split信息: writeSplits()

job.split文件: 应该是对应每个FileSplit, 他的Zfile类型和存储Path;

// job.split的内容
org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_33M.data   

/org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_10M.data

// job.splitmetainfo的内容, ?
4d45 5441 2d53 504c 0102 0109 6c6f 6361
6c68 6f73 7407 8c02 1000 0001 096c 6f63
616c 686f 7374 8fa2 8da0 0004 

MR Driver端的 Submit MR Job的逻辑

  • 先构建本地 submitJobDir临时目录;
  • 再 根据InputPath 获取文件分片 splits
  • 最后 分片和配置信息, 统一writeConf 写入到 conf文件中;
  • submitJob() 将Job和配置, jars都上传到远程MR服务;
UserDriver.main(){
    
    job.waitForCompletion(true){//Job: 
        if (state == JobState.DEFINE) {
            submit();{
                setUseNewAPI();//设置新的Api
                connect();{//创建Cluster对象,用于表示与目标FileSystem建立了连接;
                    if (cluster == null) {
                         cluster =  ugi.doAs(new PrivilegedExceptionAction<Cluster>() { return new Cluster(getConfiguration());  });
                    }
                }
                // 构建JobSubmitter , 主要是封装了: jtFs:目标文件系统(本地/hdfs), submitClient(通信客户端), hostAndPort: 地址端口;
                final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
                
                status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
                    run(){return submitter.submitJobInternal(Job.this, cluster);{//JobSubmitter.submitJobInternal()
                        // 校验Jobs 的输出格式和合法性;
                        checkSpecs(job);
                        
                        //根据(哈希)算法创建本地 submitJobDir目录,用于收集/存放/提交 Job所需配置和资源;
                        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
                        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
                        
                        try{
                            copyAndConfigureFiles(job, submitJobDir);
                            
                            /* 核心代码: 根据file文件数量, 创建相关MR任务的分片/分区: splits
                            *
                            */
                            int maps = writeSplits(job, submitJobDir);{//JobSubmitter.writeSplits()
                                if (jConf.getUseNewMapper()) {//mapred.mapper.new-api==true属性时,用新的api
                                    maps = writeNewSplits(job, jobSubmitDir);{
                                        List<InputSplit> splits = input.getSplits(job);{//FileInputFormat.getSplits()
                                            List<FileStatus> files = listStatus(job);{
                                                Path[] dirs = getInputPaths(job);{//FileInputFormat.getInputPaths()
                                                    String dirs = context.getConfiguration().get(INPUT_DIR, "");// FileInputFormat.setInputPaths() => mapreduce.input.fileinputformat.inputdir (INPUT_DIR) ,即传入Input路劲;
                                                    Path[] result = new Path[list.length];
                                                    return result;
                                                }
                                                
                                                // 根据是否递归,递归获取子文件
                                                boolean recursive = getInputDirRecursive(job);//INPUT_DIR_RECURSIVE参数(mapreduce.input.fileinputformat.input.dir.recursive)设定
                                            }
                                        }
                                        
                                        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
                                        Arrays.sort(array, new SplitComparator());
                                        return array.length;
                                    }
                                }
                            }
                            conf.setInt(MRJobConfig.NUM_MAPS, maps);
                            
                            writeConf(conf, submitJobFile);{
                                FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
                                conf.writeXml(out);
                            }
                            status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
                        }finally{
                            jtFs.delete(submitJobDir, true); //清除 submitJobDir临时提交目录;
                        }
                    }}
                });
            }
        }
        if (verbose) {//打印状态;
            monitorAndPrintJob();
        }
    }
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。