Hadoop源码笔记-NameNode启动流程

NameNode启动流程

基于Hadoop 2.7源码,可以配合源码看这篇文章效果更好

在浏览前先带着几个问题

  1. NameNode fsImag文件合并
  2. NameNode 如何通信
  3. WEB界面怎么来的
  4. 哪些条件会出发HDFS的安全模式

找到NameNode类,直接开始源码走读


/**
 */
public static void main(String argv[]) throws Exception {

    // 创建NN
    NameNode namenode = createNameNode(argv, null);
}

createNameNode方法太长,我们直接跳到最后一个case,new NameNode()方法中去


protected NameNode(Configuration conf, NamenodeRole role) 
    throws IOException { 
    // 初始化
    initialize(conf);
}

几个核心方法所在的地方

  • startHttpServer - WEB界面
  • loadNamesystem - 文件合并
  • createRpcServer - NNRpcServer
  • startCommonServices - 其他核心方法

/**
 * Initialize name-node.
 * 
 * @param conf the configuration
 */
protected void initialize(Configuration conf) throws IOException {

  // 核心方法1:HttpServer
  if (NamenodeRole.NAMENODE == role) {
    startHttpServer(conf);
  }

  // 核心方法2:合并文件FsImages
  loadNamesystem(conf);
  
  // 核心方法3:NN的Rpc Server
  rpcServer = createRpcServer(conf);

  // 核心方法4:启动公共服务
  startCommonServices(conf);
}

方法startHttpServer


private void startHttpServer(final Configuration conf) throws IOException {
  // WEB 端口50070的由来
  httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
  httpServer.start();
  httpServer.setStartupProgress(startupProgress);
}

进入start方法


/**
 * @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
 * for information related to the different configuration options and
 * Http Policy is decided.
 */
void start() throws IOException {

  // HttpServer构建
  HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
      httpAddr, httpsAddr, "hdfs",
      DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
    
  httpServer = builder.build();
  
  // 增加Servlet
  setupServlets(httpServer, conf);  
}

看看setupServlets这个方法


private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
  httpServer.addInternalServlet("startupProgress",
      StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
  httpServer.addInternalServlet("getDelegationToken",
      GetDelegationTokenServlet.PATH_SPEC, 
      GetDelegationTokenServlet.class, true);
  httpServer.addInternalServlet("renewDelegationToken", 
      RenewDelegationTokenServlet.PATH_SPEC, 
      RenewDelegationTokenServlet.class, true);
  httpServer.addInternalServlet("cancelDelegationToken", 
      CancelDelegationTokenServlet.PATH_SPEC, 
      CancelDelegationTokenServlet.class, true);
  httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
      true);
  httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
      ImageServlet.class, true);
  httpServer.addInternalServlet("listPaths", "/listPaths/*",
      ListPathsServlet.class, false);
  httpServer.addInternalServlet("data", "/data/*",
      FileDataServlet.class, false);
  httpServer.addInternalServlet("checksum", "/fileChecksum/*",
      FileChecksumServlets.RedirectServlet.class, false);
  httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
      ContentSummaryServlet.class, false);
}

增加了这么多的Servlet,我们找一个listPaths到web看看,怎么看呢,在浏览器直接输入

http://hdfs:50070/listPaths/user/hive

HttpServer的大致流程图

结果如下(我格式化一下),看数据就能得出,其他的Servlet可以一个一个试试看


<?xml version="1.0" encoding="UTF-8"?>
<listing filter=".*" path="/user/hive" exclude="" time="2020-10-22T08:41:43+0000" version="2.6.0-cdh5.14.4" recursive="no">
    <directory path="/user/hive" modified="2019-03-04T05:03:51+0000" accesstime="1970-01-01T00:00:00+0000" permission="drwxrwxr-t" owner="hive" group="hive"/>
    <directory path="/user/hive/.staging" modified="2019-03-06T10:43:09+0000" accesstime="1970-01-01T00:00:00+0000" permission="drwx------" owner="hive" group="hive"/>
    <file path="/user/hive/hive-udf" modified="2019-03-01T06:52:26+0000" accesstime="2020-10-21T07:42:08+0000" size="6246" replication="3" blocksize="134217728" permission="-rw-rw-r--" owner="hdfs" group="hive"/>
    <directory path="/user/hive/warehouse" modified="2020-08-25T09:12:27+0000" accesstime="1970-01-01T00:00:00+0000" permission="drwxrwxrwt" owner="hive" group="hive"/>
</listing>

我们回到第二个核心方法

loadNamesystem方法


static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
  FSImage fsImage = new FSImage(conf,
      FSNamesystem.getNamespaceDirs(conf),
      FSNamesystem.getNamespaceEditsDirs(conf));

  long loadStart = monotonicNow();
  try {
    // 加载fsImage
    namesystem.loadFSImage(startOpt);
  } catch (IOException ioe) {
    LOG.warn("Encountered exception loading fsimage", ioe);
    fsImage.close();
    throw ioe;
  }
  return namesystem;
}

继续下一步,这个方法就大致展示了fsImage的合并流程,由于是刚刚启动,我们只简单过一下里面的大致步骤,后面会再回过头来看这个方法

  1. 步骤1:合并元数据 fsImage + editLog = new fsImage
  2. 步骤2:fsImage存到磁盘
  3. 步骤3:打开一个新的editLog来写日志

private void loadFSImage(StartupOption startOpt) throws IOException {

    // 步骤1:合并元数据 fsImage + editLog = new fsImage
    fsImage.recoverTransitionRead(startOpt, this, recovery);
    // 步骤2:fsImage存到磁盘
    fsImage.saveNamespace(this);
    // 步骤3:打开一个新的editLog来写日志
    fsImage.openEditLogForWrite();
}

接下来看另外的核心方法(需要了解Hadoop RPC的使用)


public NameNodeRpcServer(Configuration conf, NameNode nn)
        throws IOException {

        //  serviceRpcServer代码
        this.serviceRpcServer = new RPC.Builder(conf)
        .setProtocol(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();

        // clientRpcServer代码
        this.clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService).setBindAddress(bindHost)
        .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
        }
  1. serviceRpcServer
    用于处理DataNode发送的RPC请求
  2. clientRpcServer
    用于处理Client发送的RPC请求
    NameNode在这儿就启动了2个Rpc Server

Rpc Server流程图

image.jpeg

还剩下最后一个方法


/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
  namesystem.startCommonServices(conf, haContext);
}

里面有3个很重要的方法


/** 
 * Start services common to both active and standby states
 */
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
    
    // 检查资源
    nnResourceChecker = new NameNodeResourceChecker(conf);
    checkAvailableResources();
    // HDFS的安全模式
    setBlockTotal();
    // 启动重要服务(心跳检测进程)
    blockManager.activate(conf);  
}

先看看NameNodeResourceChecker构造方法


/**
 * Create a NameNodeResourceChecker, which will check the edits dirs and any
 * additional dirs to check set in <code>conf</code>.
 */
public NameNodeResourceChecker(Configuration conf) throws IOException {
  this.conf = conf;
  volumes = new HashMap<String, CheckedVolume>();

  // 剩余磁盘的大小阀值
  duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
      DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
  
  Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
      .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
  
  Collection<URI> localEditDirs = Collections2.filter(
      FSNamesystem.getNamespaceEditsDirs(conf),
      new Predicate<URI>() {
        @Override
        public boolean apply(URI input) {
          if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
            return true;
          }
          return false;
        }
      });

  // Add all the local edits dirs, marking some as required if they are
  // configured as such.
  // 添加需要监控的磁盘
  // localEditDirs => hdfs-site.xml core-site.xml
  for (URI editsDirToCheck : localEditDirs) {
    addDirToCheck(editsDirToCheck,
        FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
            editsDirToCheck));
  }

  // All extra checked volumes are marked "required"
  for (URI extraDirToCheck : extraCheckedVolumes) {
    addDirToCheck(extraDirToCheck, true);
  }
  
  minimumRedundantVolumes = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
}

addDirToCheck方法也看看,主要是一个volumes对象


/**
 * Add the volume of the passed-in directory to the list of volumes to check.
 * If <code>required</code> is true, and this volume is already present, but
 * is marked redundant, it will be marked required. If the volume is already
 * present but marked required then this method is a no-op.
 * 
 * @param directoryToCheck
 *          The directory whose volume will be checked for available space.
 */
private void addDirToCheck(URI directoryToCheck, boolean required)
    throws IOException {
  File dir = new File(directoryToCheck.getPath());
  if (!dir.exists()) {
    throw new IOException("Missing directory "+dir.getAbsolutePath());
  }
  
  // 一个目录就是一个CheckedVolume
  CheckedVolume newVolume = new CheckedVolume(dir, required);
  // volume -> 就是多个需要检查的路径
  CheckedVolume volume = volumes.get(newVolume.getVolume());
  if (volume == null || !volume.isRequired()) {
    volumes.put(newVolume.getVolume(), newVolume);
  }
}

接着之前的继续看checkAvailableResource方法


/**
 * Perform resource checks and cache the results.
 */
void checkAvailableResources() {
  Preconditions.checkState(nnResourceChecker != null,
      "nnResourceChecker not initialized");
  hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}

继续走,这块需要注意一下之前被赋值的volumes


/**
 * Return true if disk space is available on at least one of the configured
 * redundant volumes, and all of the configured required volumes.
 * 
 * @return True if the configured amount of disk space is available on at
 *         least one redundant volume and all of the required volumes, false
 *         otherwise.
 */
public boolean hasAvailableDiskSpace() {
  // volumes 存放的就是需要检查的目录
  return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
      minimumRedundantVolumes);
}

继续走


/**
 * Return true if and only if there are sufficient NN
 * resources to continue logging edits.
 * 
 * @param resources the collection of resources to check.
 * @param minimumRedundantResources the minimum number of redundant resources
 *        required to continue operation.
 * @return true if and only if there are sufficient NN resources to
 *         continue logging edits.
 */
static boolean areResourcesAvailable(
    Collection<? extends CheckableNameNodeResource> resources,
    int minimumRedundantResources) {

  int requiredResourceCount = 0;
  int redundantResourceCount = 0;
  int disabledRedundantResourceCount = 0;
  // 遍历每一个目录
  for (CheckableNameNodeResource resource : resources) {
    if (!resource.isRequired()) {
      redundantResourceCount++;
      // 判断磁盘资源是否充足
      if (!resource.isResourceAvailable()) {
        disabledRedundantResourceCount++;
      }
    } else {
      requiredResourceCount++;
      // 判断磁盘资源是否充足
      if (!resource.isResourceAvailable()) {
        // Short circuit - a required resource is not available.
        return false;
      }
    }
  }
  
  if (redundantResourceCount == 0) {
    // If there are no redundant resources, return true if there are any
    // required resources available.
    return requiredResourceCount > 0;
  } else {
    return redundantResourceCount - disabledRedundantResourceCount >=
        minimumRedundantResources;
  }
}

看看isResourceAvailable这个方法,这是个接口,所以找到对应的实现类NameNodeResourceChecker


@Override
public boolean isResourceAvailable() {
  // 获取当前目录可用的大小
  long availableSpace = df.getAvailable();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Space available on volume '" + volume + "' is "
        + availableSpace);
  }
  // 如果空间小于duReserved(还记得这个值吗,之前有初始化,默认值100M)
  if (availableSpace < duReserved) {
    LOG.warn("Space available on volume '" + volume + "' is "
        + availableSpace +
        ", which is below the configured reserved amount " + duReserved);
    return false;
  } else {
    return true;
  }
}

这里有一些计算了,如果剩余可用空间不足100M,就返回false,回到调用


/**
 * Perform resource checks and cache the results.
 */
void checkAvailableResources() {
  Preconditions.checkState(nnResourceChecker != null,
      "nnResourceChecker not initialized");
  // 最终的值就是false    
  hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}

回到之前的地方,看下一个方法setBlockTotal


/**
 * Set the total number of blocks in the system. 
 */
public void setBlockTotal() {
  // safeMode is volatile, and may be set to null at any time
  SafeModeInfo safeMode = this.safeMode;
  if (safeMode == null)
    return;
   // 设置安全模式
  // getCompleteBlocksTotal()方法返回的值就是'可以正常使用的block数'
  safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}

这个地方需要先看看getCompleteBlocksTotal方法的值


/**
 * Get the total number of COMPLETE blocks in the system.
 * For safe mode only complete blocks are counted.
 *
 * HDFS中有2中block
 * 1)COMPLETE:正常的可用的block
 * 2)ConstructionBlocks:处于正在构建的block
 */
private long getCompleteBlocksTotal() {
  // Calculate number of blocks under construction
  long numUCBlocks = 0;
  readLock();
  // 获取正在构建的block数
  numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
  try {
    // 总的block数 - 正在构建的block数 = 正常的block数
    return getBlocksTotal() - numUCBlocks;
  } finally {
    readUnlock();
  }
}

接下来看看setBlockTotal方法


/**
 * Set total number of blocks.
 */
private synchronized void setBlockTotal(int total) { // 假设一共有1000个可用的block
  this.blockTotal = total;
  // threshold = 0.999f
  // 计算阀值 blockThreshold = 999 如果有1000个block,如果有999的block就正常
  this.blockThreshold = (int) (blockTotal * threshold);
  // 也是999
  this.blockReplQueueThreshold = 
    (int) (blockTotal * replQueueThreshold);
  if (haEnabled) {
    // After we initialize the block count, any further namespace
    // modifications done while in safe mode need to keep track
    // of the number of total blocks in the system.
    this.shouldIncrementallyTrackBlocks = true;
  }
  if(blockSafe < 0)
    this.blockSafe = 0;
  // 检查安全模式
  checkMode();
}

进入核心方法


/**
 * Check and trigger safe mode if needed. 
 */
private void checkMode() {
  // Have to have write-lock since leaving safemode initializes
  // repl queues, which requires write lock
  assert hasWriteLock();
  if (inTransitionToActive()) {
    return;
  }
  // if smmthread is already running, the block threshold must have been 
  // reached before, there is no need to enter the safe mode again
  // 是否进入安全模式needEnter 核心方法
  if (smmthread == null && needEnter()) {
    // 进入安全模式
    enter();
    return;
  }
}

needEnter方法判断HDFS是否需要进入安全模式

一共有3个条件

  1. threshold != 0 && blockSafe < blockThreshold
    HDFS的元数据会有上一次关闭集群的总Block数量
    假设之前有1000个complete的block,按照比例计算要求的block是999个
    threshold = 0.999
    blockThreshold = 999
    blockSafe就是DataNode向NameNode汇报上来的所有block总数
    如果满足所有汇报的block总数 < 阀值(999)就会进入安全模式
  2. datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold
    datanodeThreshold是存活的ataNode存的阀值,默认值是0
    这个条件也就是如果设置了ataNode存活的阀值,并且当前存活的DataNode小于阀值就会进入安全模式
  3. !nameNodeHasResourcesAvailable()
    这个条件是检查了当时的磁盘可用大小,如果磁盘可用大小不足100M,则会进入安全模式

/** 
 * There is no need to enter safe mode 
 * if DFS is empty or {@link #threshold} == 0
 *
 * 一共有3个条件
 * 1)threshold(0.999) != 0 && blockSafe(集群上报的block数量) < blockThreshold(999)
 * 2)datanodeThreshold(需要或者的DataNode的数量,默认值0) != 0 && getNumLiveDataNodes(存储的DataNode的数量) < datanodeThreshold
 * 3)!nameNodeHasResourcesAvailable(资源充足,在前面计算过,不充足(100M)返回false)
 */
private boolean needEnter() {
  return (threshold != 0 && blockSafe < blockThreshold) ||
    (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
    (!nameNodeHasResourcesAvailable());
}

blockSafe的值如何来的呢


/**
 * Increment number of safe blocks if current block has 
 * reached minimal replication.
 * @param replication current replication 
 */
private synchronized void incrementSafeBlockCount(short replication) {
  // DataNode向NameNode进行block report
  if (replication == safeReplication) {
    // 初始值是0,每个block都会调用这个,累计了所有DataNode完整的block的数量
    this.blockSafe++;

    // Report startup progress only if we haven't completed startup yet.
    StartupProgress prog = NameNode.getStartupProgress();
    if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
      if (this.awaitingReportedBlocksCounter == null) {
        this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
          STEP_AWAITING_REPORTED_BLOCKS);
      }
      this.awaitingReportedBlocksCounter.increment();
    }

    checkMode();
  }
}

到这里NameNode的启动流程就告一段落了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352