MapReducer任务在到Yarn上运行流程分析

  1. 以WordCount为例
public class WordCount
{
  public static void main(String[] args)
    throws Exception
  {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    //开始提交
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
  {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
      throws IOException, InterruptedException
    {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      this.result.set(sum);
      context.write(key, this.result);
    }
  }

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
    {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        this.word.set(itr.nextToken());
        context.write(this.word, one);
      }
    }
  }
}

2.Job类分析

 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();    //开始提交
    }
    if (verbose) {
      monitorAndPrintJob();   //提交成功的话,监控打印任务进度
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

4.Job submit方法

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();        //根据配置信息向resourcemanager建立联系
    //得到提交器
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      //提交器提交任务
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  1. submitter.submitJobInternal(Job.this, cluster);
 //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    //会使用Rpc与resoucemanager建立联系,得到JobId
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    //任务在yarn上的目录,存放jar包资源文件等
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }
      //把任务的jar包复制放到HDFS上的任务目录下
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      //写分片信息
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
     //设置任务的队列,每个用户会有一个队列
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      //所有的配置文件写到任务目录下
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      //真正开始提交任务
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }

5 写分片信息 writeSplits(job, submitJobDir);

     private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   //对文件进行切分,得到分片信息
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    //对分片信息进行排序,大的分片先运行
    Arrays.sort(array, new SplitComparator());
    //将分片元数据信息写到文件中,文件在HDFS的任务目录下
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }
  

6.org.apache.hadoop.mapred.YARNRunner由这个类来进行提交

 @Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    设置ApplicatonMaster的初始化信息,包括jar包,资源,启动命令等,每个应用都有一个ApplicationMaster
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
     //再交由底层的,使用rpc协议提交到resourcemanager
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);
      //整个后面,判断appMaster的提交状态
      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

7.ApplicationMaster的初始化信息

public ApplicationSubmissionContext createApplicationSubmissionContext(
  Configuration jobConf,
  String jobSubmitDir, Credentials ts) throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(
    conf.getInt(
        MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
        )
    );
capability.setVirtualCores(
    conf.getInt(
        MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
        )
    );
LOG.debug("AppMaster capability = " + capability);

// Setup LocalResources
Map<String, LocalResource> localResources =
    new HashMap<String, LocalResource>();

Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);

URL yarnUrlForJobSubmitDir = ConverterUtils
    .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
        .resolvePath(
            defaultFileContext.makeQualified(new Path(jobSubmitDir))));
LOG.debug("Creating setup context, jobSubmitDir url is "
    + yarnUrlForJobSubmitDir);

localResources.put(MRJobConfig.JOB_CONF_FILE,
    createApplicationResource(defaultFileContext,
        jobConfPath, LocalResourceType.FILE));
if (jobConf.get(MRJobConfig.JAR) != null) {
  Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
  LocalResource rc = createApplicationResource(
      FileContext.getFileContext(jobJarPath.toUri(), jobConf),
      jobJarPath,
      LocalResourceType.PATTERN);
  String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
      JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
  rc.setPattern(pattern);
  localResources.put(MRJobConfig.JOB_JAR, rc);
} else {
  // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
  // mapreduce jar itself which is already on the classpath.
  LOG.info("Job jar is not present. "
      + "Not adding any jar to the list of resources.");
}

// TODO gross hack
for (String s : new String[] {
    MRJobConfig.JOB_SPLIT,
    MRJobConfig.JOB_SPLIT_METAINFO }) {
  localResources.put(
      MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
      createApplicationResource(defaultFileContext,
          new Path(jobSubmitDir, s), LocalResourceType.FILE));
}

// Setup security tokens
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
    + "/bin/java");

Path amTmpDir =
    new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + amTmpDir);
MRApps.addLog4jSystemProperties(null, vargs, conf);

// Check for Java Lib Path usage in MAP and REDUCE configs
warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
    MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
    MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
    MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
    MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);

// Add AM admin command opts before user command opts
// so that it can be overridden by user
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
    MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
    MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
vargs.add(mrAppMasterAdminOptions);

// Add AM user command opts
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
    MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
    MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
vargs.add(mrAppMasterUserOptions);

if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
    MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
  final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
      MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
  if (profileParams != null) {
    vargs.add(String.format(profileParams,
        ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
            + TaskLog.LogName.PROFILE));
  }
}

vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
    Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
    Path.SEPARATOR + ApplicationConstants.STDERR);


Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
  mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());

LOG.debug("Command to launch container for ApplicationMaster is : "
    + mergedCommand);

// Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, conf);

// Shell
environment.put(Environment.SHELL.name(),
    conf.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL,
        MRJobConfig.DEFAULT_SHELL));

// Add the container working directory in front of LD_LIBRARY_PATH
MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
    MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);

// Setup the environment variables for Admin first
MRApps.setEnvFromInputString(environment, 
    conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV,
        MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf);
// Setup the environment variables (LD_LIBRARY_PATH, etc)
MRApps.setEnvFromInputString(environment, 
    conf.get(MRJobConfig.MR_AM_ENV), conf);

// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);

Map<ApplicationAccessType, String> acls
    = new HashMap<ApplicationAccessType, String>(2);
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
    MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
    MRJobConfig.JOB_ACL_MODIFY_JOB,
    MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));

// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
    ContainerLaunchContext.newInstance(localResources, environment,
      vargsFinal, null, securityTokens, acls);

Collection<String> tagsFromConf =
    jobConf.getTrimmedStringCollection(MRJobConfig.JOB_TAGS);

// Set up the ApplicationSubmissionContext
ApplicationSubmissionContext appContext =
    recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
appContext.setApplicationId(applicationId);                // ApplicationId
appContext.setQueue(                                       // Queue name
    jobConf.get(JobContext.QUEUE_NAME,
    YarnConfiguration.DEFAULT_QUEUE_NAME));
// add reservationID if present
ReservationId reservationID = null;
try {
  reservationID =
      ReservationId.parseReservationId(jobConf
          .get(JobContext.RESERVATION_ID));
} catch (NumberFormatException e) {
  // throw exception as reservationid as is invalid
  String errMsg =
      "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
          + " specified for the app: " + applicationId;
  LOG.warn(errMsg);
  throw new IOException(errMsg);
}
if (reservationID != null) {
  appContext.setReservationID(reservationID);
  LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
      + " to queue:" + appContext.getQueue() + " with reservationId:"
      + appContext.getReservationID());
}
appContext.setApplicationName(                             // Job name
    jobConf.get(JobContext.JOB_NAME,
    YarnConfiguration.DEFAULT_APPLICATION_NAME));
appContext.setCancelTokensWhenComplete(
    conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
appContext.setAMContainerSpec(amContainer);         // AM Container
appContext.setMaxAppAttempts(
    conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
appContext.setResource(capability);

// set labels for the AM container request if present
String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
if (null != amNodelabelExpression
    && amNodelabelExpression.trim().length() != 0) {
  ResourceRequest amResourceRequest =
      recordFactory.newRecordInstance(ResourceRequest.class);
  amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
  amResourceRequest.setResourceName(ResourceRequest.ANY);
  amResourceRequest.setCapability(capability);
  amResourceRequest.setNumContainers(1);
  amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
  appContext.setAMContainerResourceRequest(amResourceRequest);
}
// set labels for the Job containers
appContext.setNodeLabelExpression(jobConf
    .get(JobContext.JOB_NODE_LABEL_EXP));

appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
  appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}

return appContext;

8.ResourceManager收到请求后,通知nodemanager,运行AppMaster,调度器会给nodemanager分配Container,让nodemanager执行AppMaster,
AppMaster启动之后,先向resourcemanager注册自己,resourcemanager,才能知道任务的运行信息,然后向resourcemanager请求分配Container,Container由调度器分配,会得到Container的内存,cpu以及在哪个节点上。

resourcemanager.scheduler.SchedulerNode: Assigned container container_1515076284174_0005_01_000004 of capacity <memory:1024, vCores:1> on host centos-3:8041, which has 1 containers, <memory:1024, vCores:1> used and <memory:36, vCores:0> available after allocation

AppMaster得到Container信息后指定nodemanager启动Container去运行任务,AppMaster会监控Container任务的运行情况,同时向ResourceManger报告任务信息.
每个nodemanager会向resoucemanager发送心跳信息,resourcemanger会根据所有节点的心跳知道整个集群的资源,这样调度器才能合理的分配Container。
每个AppMaster,任务完成后会向resourcemanage取消注册。这样一个任务就完成了。
9.分析resourcemanager任务日志,分配Container信息全部在resourcemanager节点的日志信息中,这里只截了第一个Container信息。


resourcemanager.ClientRMService: Allocated new applicationId: 5
resourcemanager.ClientRMService: Application with id 5 submitted by user root
resourcemanager.rmapp.RMAppImpl: Storing application with id application_1515076284174_0005
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from NEW to NEW_SAVING
resourcemanager.RMAuditLogger: USER=root    IP=172.31.109.168   OPERATION=Submit Application Request    TARGET=ClientRMService  RESULT=SUCCESS  APPID=application_1515076284174_0005
resourcemanager.recovery.RMStateStore: Storing info for app: application_1515076284174_0005
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from NEW_SAVING to SUBMITTED
resourcemanager.scheduler.fair.FairScheduler: Accepted application application_1515076284174_0005 from user: root, in queue: default, currently num of applications: 1
resourcemanager.rmapp.RMAppImpl: application_1515076284174_0005 State change from SUBMITTED to ACCEPTED
resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1515076284174_0005_000001
resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1515076284174_0005_000001 State change from NEW to SUBMITTED
resourcemanager.scheduler.fair.FairScheduler: Added Application Attempt appattempt_1515076284174_0005_000001 to scheduler from user: root
resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1515076284174_0005_000001 State change from SUBMITTED to SCHEDULED
resourcemanager.rmcontainer.RMContainerImpl: container_1515076284174_0005_01_000001 Container Transitioned from NEW to ALLOCATED
resourcemanager.RMAuditLogger: USER=root    OPERATION=AM Allocated Container    TARGET=SchedulerApp RESULT=SUCCESS  APPID=application_1515076284174_0005    CONTAINERID=container_1515076284174_0005_01_000001
//可以看到container编号的尾号是00001证明这是第一个Container容器,在机器Centos-2上运行。
**resourcemanager.scheduler.SchedulerNode: Assigned container container_1515076284174_0005_01_000001 of capacity <memory:1024, vCores:1> on host centos-2:8041, which has 1 containers, <memory:1024, vCores:1> used and <memory:36, vCores:0> available after allocation**

10.我运行程序的时候有6个map,默认一个reduce,再加上第一个AppMaster运行的Container一共分配了8个Container,第一个Container在Centos-2上分配,其它的7个Container都分配在了Centos-3上了,可以从resourcemanage日志中看出来。可以在Centos-3上验证你的想法


image
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容

  • 大家好,我叫王梓旭,要上五年级了。 我不怎么说话。上次老师让我回答问题,我不敢大声回答。 我喜欢小狗,尤其是我家的...
    王梓旭520阅读 279评论 1 0
  • 爱是什么,没有人能解释明白。因为这是一种感觉,一种自然的不能在自然的感觉。 在我看来,秋就是一场童话,一场梦,但却...
    白小源家的燕砸阅读 239评论 0 0
  • 快乐是每一人的专属 悲伤也是每一人所能拥有的 有的人即使在悲伤中也能给其它人带来欢乐 而有的人即使在快乐中也不能给...
    半分微凉阅读 341评论 0 1
  • Hi,你是否有种感觉,长大后的我们偶尔喜欢怀旧,想念那份美好。而当我们憧憬未来时,回忆下过去,也是种享受。 还记得...
    cangelaz阅读 776评论 6 5