Hadoop的Job提交流程简析(二)

程序到了Job.submit(),提交作业的流程就进入了第二个阶段,如果把Hadoop的集群中每一个节点都看做一个岛屿,那么这就是要出海了,涉及到跨节点的操作了。
Job.submit()代码:

  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);//确认没有重复提交
    setUseNewAPI();//根据配置信息确定是否采用新API
    connect();//建立与集群的连接,创建Cluster对象cluster
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

Job.setUserNewAPI()这个函数根据配置文件中的若干配置项确定本作业所采用的是新API还是老API,并生成显式的配置项“mapred.mapper.new-api”和“mapred.reducer.new-api”,并将之写入内存中的配置块。

connect()起到跨节点操作,建立对外联系的作用。创建一个专门提交作业的JobSubmitter.submitJobInternal()方法。当程序从submitJobInternal()返回的时候,作业已经提交完成,所以就到作业状态置为RUNNING。

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

Job.connect()代码:

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {//如果cluster尚未创建
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

可见,connect()的作用就是保证节点上有个Cluster类对象,如果还没有,就创建一个。顾名思义,Cluster类应该存有这个集群的信息,也应该知道如何和集群打交道。来看一下Cluster类的代码:

public class Cluster {
  private ClientProtocolProvider clientProtocolProvider;//集群下是YarnClientProtocolProvider
  private ClientProtocol client;//在集群条件下,这是与外界通信的条件和规则
  static { ConfigUtil.loadResources() }//类的静态初始化
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException{}
  ......省略......
}

static { ConfigUtil.loadResources() }通过类的静态初始化装载了一批.xml的配置文件,主要有mapred-default.xml, mapred-site.xml和yarn-default.xml, yarn-site.xml。

整个流程可以简化为:
Job.submit -> job.connect() -> Cluster.cluster() ->Cluster.initialize()

Hadoop运行MapReduce作业的工作原理:


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 目的这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。先决条件请先确认Had...
    SeanC52111阅读 1,762评论 0 1
  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,328评论 0 34
  • 终极算法 关注微信号每天收听我们的消息终极算法为您推送精品阅读 前言 Hadoop 在大数据技术体系中的地位至关...
    Yespon阅读 130,296评论 12 168
  • YarnYarn产生背景:Yarn直接来自于MR1.0MR1.0 问题:采用的是master slave结构,ma...
    时待吾阅读 5,816评论 2 23
  • 上周开得正盛的樱花,这周再去探访,已经有些凋谢了,一阵轻风吹过,一阵阵花瓣雨落下,儿子和小伙伴们在花树下挖土筑渠,...
    兔妈妈1503阅读 213评论 1 2