一、概括:
wordcount作业提交流程,主要集中在JobSubmitter.submitJobInternal中,包括检测输出目录合法性,设置作业提交信息(主机和用户),获得JobID,向HDFS中拷贝作业所需文件(Job.jar Job.xml split文件等)最后执行作业提交。这里以WordCount为例介绍提交流程.
二、wordcount流程代码
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//step 1: 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
String input = "data/wc.data";
String output = "out/";
FileUtils.deleteTarget(output, configuration);
//step 2: 设置jar的相关信息
job.setJarByClass(WordCountApp.class);
//step 3:设置自定义的mapper和reducer信息
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//step 4:设置mapper阶段输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//step 5:设置reducer阶段输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//step 6:设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
//step 7:提交job
boolean result = job.waitForCompletion(true);//此处进入作业提交流程,然后循环监控作业状态
System.exit(result ? 0 : 1);
}
三、流程主要代码讲解
-
waitForCompletion
方法中主要是提交作业和监控状态,这里我们主要分析作业提交
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
//提交作业
submit();
}
if (verbose) {
//监控job,并实时打印任务状态
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
-
submit
方法中主要是:1.连接集群。2.作业提交
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
//连接集群(lcoal ,Yarn)
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//提交作业
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
- 连接集群时会通过
conf
构造一个cluster
实例,重要的是cluster
的初始化部份
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
//实际上是根据你的配置创建YarnRunner对象,还是LocalJobRunner对象
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
//初始化cluster内部成员变量
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
- YARNRunner 构造方法
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
ClientCache clientCache) {
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate;
this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}
- ResourceMgrDelegate构造方法
public ResourceMgrDelegate(YarnConfiguration conf) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
//实际上是创建YarnClientImpl对象
this.client = YarnClient.createYarnClient();
init(conf);
//实际上是创建rmClient对象
start();
}
- 接下来看
JobSubmitter.submitJobInternal
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//检测输出目录合法性,是否已存在,或未设置
//validate the jobs output specs
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
//获取staging路径,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/hadoop/.staging ,可通过yarn.app.mapreduce.am.staging-dir修改
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
//主机名和地址设置
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
//获取新的JobID,此处需要RPC调用
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//获取提交的job目录:/tmp/hadoop-yarn/staging/root/.staging/job_xxxxxx
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
//向集群中拷贝所需文件,包含-files, -libjars and -archives和你作业的jar包
copyAndConfigureFiles(job, submitJobDir);
//设置job.xml路径
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 写分片文件(job.split文件、job.splitmetainfo文件)到提交的目录
//获取map数量
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
//设置队列名
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
//将本job信息写入job.xml 到提交的目录
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
//真正提交job到RM,底层是通过rmClient提交应用
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
//删除提交的目录
jtFs.delete(submitJobDir, true);
}
}
}
-
writeSplits
方法中主要的writeNewSplits
方法
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
//反射获取输入的inputFormat类型,默认是TextInputFormat
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//inputFomat对读取的文件进行分片操作
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
//对分片进行排序,把分片大的放数组前面
Arrays.sort(array, new SplitComparator());
//将分片文件写入提交的目录中去
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
- 默认是走的TextInputFormat的
getSplit
方法
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
//分片最小size,默认为1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//分片最大size,默认为Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
//获取文件的文件状态
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
//如果文件大小不为0
if (length != 0) {
//获取文件block地址
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//判断文件是否可分割
if (isSplitable(job, path)) {
//获取文件blocksize,默认128M
long blockSize = file.getBlockSize();
//计算splitSize,具体就是取中间那个。
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
//如果bytesRemaining)/splitSize>1.1就执行
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//添加最后一个块信息到splits数组中
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
//不可分的,直接作为一个块加到splits数组中
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}