提交
提交源码
//true代表打印执行过程
boolean b = job.waitForCompletion(true);
Job.java
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException, ClassNotFoundException {
//Job现在状态还是DEFINE,会执行submit
if (state == JobState.DEFINE) {
submit();
if (verbose) {
//监控打印job的信息,执行完临时目录的内容就删掉了
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
}
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//确认状态
ensureState(JobState.DEFINE);
//设置新旧api兼容
setUseNewAPI();
//设置连接,如果没设置用户,doAs设置当前主机为登录用户
connect();
//设置提交job的fs,客户端,状态等信息
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//墨迹半天终于提交了
return submitter.submitJobInternal(Job.this, cluster);
//提交流程结束,JobState改变为running
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
提交进JobSubmitter.java
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//检查输出路径
checkSpecs(job);
//进去看一眼,全是site.xml,存的都是配置文件
Configuration conf = job.getConfiguration();
//字面意思,添加MR框架去分布式缓存
//大概就是把配置文件发给每个maptask和//reducetask addMRFrameworkToDistributedCache(conf);
//会生成一个临时目录,可为什么在d盘啊
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
//生成了唯一标识jobid
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//在刚才生成的临时目录下定义路径,准备生成
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
//配置suffleid/qeque什么的
...
//这个方法一执行,刚才的定义的路径就生成文件夹了
//进到JobSubmitter里了,执行rUploader.uploadResources
//然后进JobResourceUploader里,mkdir生成了文件夹
//uploadJobJar,集群模式没jar,要提交到集群,所以先提交到文件夹里
copyAndConfigureFiles(job, submitJobDir);
//路径是文件夹路径/job.xml
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//切片源码,执行完切片文件放入刚才文件夹
int maps = writeSplits(job, submitJobDir);
//设置切片个数,切片赋给MRJobConfig.NUM_MAPS了
conf.setInt(MRJobConfig.NUM_MAPS, maps);
...
//把xml放进来了,都是配置信息
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
//设置job为的status变为running
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
切片
切片源码
JobSubmitter.java
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
//jobSubmitDir 存储路径(就是生成那个临时文件夹)
//jConf是job的信息,id什么的
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//hadoop3.x的切片方式,我就进入这个方法。
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
//hadoop2.x的切片方式
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
//hadoop3.x具体切片
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//切片了!!!!!!!!!!!
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
InputFormat有很多实现类。默认进入FileInputFormat.java实现类
FileInputFormat.java
public static long getMaxSplitSize(JobContext context) {
//SPLIT_MAXSIZE不设置没有
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
//切片最小的size,getFormatMinSplitSize()返回1
//getMinSplitSize()设置在yarn-default里,默认值是0
//mapreduce.input.fileinputformat.split.minsize
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//切片最大的size
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
...
循环遍历文件列表进行切片,说明是按照文件进行切分
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//文件是否可以切割
if (isSplitable(job, path)) {
//获取块大小
long blockSize = file.getBlockSize();
//获取切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//一个文件的大小
long bytesRemaining = length;
//SPLIT_SLOP=1.1
//如果文件大于切片大小的1.1倍就切片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//感觉就是循环切完片了,如果还剩下点,就放在最后一片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
}else {
// not splitable
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
}
提交
切片