Hadoop源码笔记-DataNode启动流程

DataNode启动流程

基于Hadoop 2.7源码,可以配合源码看这篇文章效果更好

找到入口(DataNode的main方法)

public static void main(String args[]) {
  // 入口
  secureMain(args, null);
}

进入secureMain方法

public static void secureMain(String args[], SecureResources resources) {
    // 顾名思义,创建DataNode
    DataNode datanode = createDataNode(args, null, resources);
}

进入instantiateDataNode方法

/** Instantiate & Start a single datanode daemon and wait for it to finish.
 *  If this thread is specifically interrupted, it will stop waiting.
 */
@VisibleForTesting
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
    SecureResources resources) throws IOException {
        // 初始化DataNode
  DataNode dn = instantiateDataNode(args, conf, resources);
}

进入makeInstance方法

/** Instantiate a single datanode object, along with its secure resources. 
 * This must be run by invoking{@link DataNode#runDatanodeDaemon()} 
 * subsequently. 
 */
public static DataNode instantiateDataNode(String args [], Configuration conf,
    SecureResources resources) throws IOException {
  // 创建实例
  return makeInstance(dataLocations, conf, resources);
}

进入DataNode方法

/**
 * Make an instance of DataNode after ensuring that at least one of the
 * given data directories (and their parent directories, if necessary)
 * can be created.
 * @param dataDirs List of directories, where the new DataNode instance should
 * keep its files.
 * @param conf Configuration instance to use.
 * @param resources Secure resources needed to run under Kerberos
 * @return DataNode instance for given list of data dirs and conf, or null if
 * no directory from this directory list can be created.
 * @throws IOException
 */
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  // 新对象
  return new DataNode(conf, locations, resources);
}

进入DataNode构造函数

/**
 * Create the DataNode given a configuration, an array of dataDirs,
 * and a namenode proxy
 */
DataNode(final Configuration conf,
         final List<StorageLocation> dataDirs,
         final SecureResources resources) throws IOException {
    // 启动
    startDataNode(conf, dataDirs, resources);
}

进入startDataNode方法
其中有几个重要的方法,我拿出了几个我觉得重要的会在下面说明

/**
 * This method starts the data node with the specified conf.
 * 
 * @param conf - the configuration
 *  if conf's CONFIG_PROPERTY_SIMULATED property is set
 *  then a simulated storage based data node is created.
 * 
 * @param dataDirs - only for a non-simulated storage data node
 * @throws IOException
 */
void startDataNode(Configuration conf, 
                   List<StorageLocation> dataDirs,
                   SecureResources resources
                   ) throws IOException {
  // 重要方法1
  registerMXBean();
  // 重要方法2
  initDataXceiver(conf);
  // 重要方法3 HTTP Server
  startInfoServer(conf);
  // 重要方法4 RPC Server 
  initIpcServer(conf);
  
  // 创建了BlockManager,正常情况下一个集群只有一个BlockPool(联邦除外)
  blockPoolManager = new BlockPoolManager(this);
  
  // 重要方法5 注册和心跳
  blockPoolManager.refreshNamenodes(conf);
}

5个核心方法

  1. registerMXBean();
  2. initDataXceiver(conf);
  3. startInfoServer(conf) - HTTP Server(数据)
  4. initIpcServer(conf) - RPC Server(通信)
  5. refreshNamenodes(conf) - 注册和心跳

HTTP Server

这部分代码没什么难的,DataNode也有自己的HttpServer,里面也加了一些Servlet,有兴趣的可以通过addInternalServlet方法中的类的doGet方法具体实现

/**
 * @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
 * for information related to the different configuration options and
 * Http Policy is decided.
 */
private void startInfoServer(Configuration conf)
  throws IOException {
  Configuration confForInfoServer = new Configuration(conf);
  confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
  // HttpServer初始化
  HttpServer2.Builder builder = new HttpServer2.Builder()
    .setName("datanode")
    .setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
    .addEndpoint(URI.create("http://localhost:0"))
    .setFindPort(true);
  // Http Server启动
  this.infoServer = builder.build();
  // 新增Servlet
  this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
  this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
      FileChecksumServlets.GetServlet.class);
}

RPC Server

RCP也是理解Hadoop源码的一个重要前提,如果不理解可以先看看RPC的简单应用

private void initIpcServer(Configuration conf) throws IOException {
  // 构建了一个DataNode的RPC(此处有设计模式))
  ipcServer = new RPC.Builder(conf)
      .setProtocol(ClientDatanodeProtocolPB.class)
      .setInstance(service)
      .setBindAddress(ipcAddr.getHostName())
      .setPort(ipcAddr.getPort())
      .setNumHandlers(
          conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
              DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
      .setSecretManager(blockPoolTokenSecretManager).build()
  }
}

DataNode也启动了一个RPC,是为了干什么,后面用到再提,先记住DataNode也有自己的RPCServer(通信)

注册和心跳

这部分是DataNode的核心之一,也是所有分布式系统的基石

private void doRefreshNamenodes(
    Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
  assert Thread.holdsLock(refreshNamenodesLock);

  Set<String> toRefresh = Sets.newLinkedHashSet();
  Set<String> toAdd = Sets.newLinkedHashSet();
  Set<String> toRemove;
  
  synchronized (this) {
    // Step 1. For each of the new nameservices, figure out whether
    // it's an update of the set of NNs for an existing NS,
    // or an entirely new nameservice.
    for (String nameserviceId : addrMap.keySet()) {
      if (bpByNameserviceId.containsKey(nameserviceId)) {
        toRefresh.add(nameserviceId);
      } else {
        toAdd.add(nameserviceId);
      }
    }
    
    // Step 2. Any nameservices we currently have but are no longer present
    // need to be removed.
    toRemove = Sets.newHashSet(Sets.difference(
        bpByNameserviceId.keySet(), addrMap.keySet()));
    
    assert toRefresh.size() + toAdd.size() ==
      addrMap.size() :
        "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
        "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
        "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);

    
    // Step 3. Start new nameservices
    if (!toAdd.isEmpty()) {
      LOG.info("Starting BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toAdd));
    
      for (String nsToAdd : toAdd) {
        ArrayList<InetSocketAddress> addrs =
          Lists.newArrayList(addrMap.get(nsToAdd).values());
        BPOfferService bpos = createBPOS(addrs);
        bpByNameserviceId.put(nsToAdd, bpos);
        offerServices.add(bpos);
      }
    }
    startAll();
  }

前面都是一些conf的解析,我们直接进入重要方法startAll

synchronized void startAll() throws IOException {
  try {
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            // 关键类(BPOfferService),每一个联邦都会创建一个 BPOfferService
            for (BPOfferService bpos : offerServices) {
              bpos.start();
            }
            return null;
          }
        });
  } catch (InterruptedException ex) {
    IOException ioe = new IOException();
    ioe.initCause(ex.getCause());
    throw ioe;
  }
}

接着看看bpos.start方法

void start() {
  // BPServiceActor:有多少个NameNode都会创建多少个BPServiceActor
  for (BPServiceActor actor : bpServices) {
    // 每个NameNode都会start,啥意思呢,active和standby
    actor.start();
  }
}

再进入start方法

//This must be called only by BPOfferService
void start() {
  if ((bpThread != null) && (bpThread.isAlive())) {
    //Thread is started already
    return;
  }
  bpThread = new Thread(this, formatThreadName());
  bpThread.setDaemon(true); // needed for JUnit testing
  bpThread.start();
}

这个地方是一个线程启动,直接寻找run方法

/**
 * No matter what kind of exception we get, keep retrying to offerService().
 * That's the loop that connects to the NameNode and provides basic DataNode
 * functionality.
 *
 * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
 * happen either at shutdown or due to refreshNamenodes.
 */
@Override
public void run() {
  LOG.info(this + " starting to offer service");

  try {
    while (true) {
      // init stuff
      try {
        // setup storage
        // 重要方法1:获取元数据信息并注册自己,非常重要的方法
        connectToNNAndHandshake();
        break;
      } catch (IOException ioe) {
        // Initial handshake, storage recovery or registration failed
        runningState = RunningState.INIT_FAILED;
        if (shouldRetryInit()) {
          // Retry until all namenode's of BPOS failed initialization
          LOG.error("Initialization failed for " + this + " "
              + ioe.getLocalizedMessage());
          sleepAndLogInterrupts(5000, "initializing");
        } else {
          runningState = RunningState.FAILED;
          LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
          return;
        }
      }
    }

    runningState = RunningState.RUNNING;

    while (shouldRun()) {
      try {
        // 重要方法2:心跳
        offerService();
      } catch (Exception ex) {
        LOG.error("Exception in BPOfferService for " + this, ex);
        sleepAndLogInterrupts(5000, "offering service");
      }
    }
    runningState = RunningState.EXITED;
  } catch (Throwable ex) {
    LOG.warn("Unexpected exception in block pool " + this, ex);
    runningState = RunningState.FAILED;
  } finally {
    LOG.warn("Ending block pool service for: " + this);
    cleanUp();
  }
}

这里有一块代码可以分析一下

while (true) {
  // init stuff
  try {
    // setup storage
    connectToNNAndHandshake();
    break;
  } catch (IOException ioe) {
    // Initial handshake, storage recovery or registration failed
    runningState = RunningState.INIT_FAILED;
    if (shouldRetryInit()) {
      // Retry until all namenode's of BPOS failed initialization
      LOG.error("Initialization failed for " + this + " "
          + ioe.getLocalizedMessage());
      sleepAndLogInterrupts(5000, "initializing");
    } else {
      runningState = RunningState.FAILED;
      LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
      return;
    }
  }
}

connectToNNAndHandshake中的代码是DataNode向NameNode获取元数据信息和注册自己的方法,是一个很重要的方法,这个代码可以简单理解为,即合理使用While(true)来尽可能保证重要代码的运行

while (true) {
  try {
    // 核心代码
    connectToNNAndHandshake();
    break;
      } catch (IOException ioe) {
        ......    
      }
}

2个大核心方法,在接下来的源码分析需要一个前提,就是HadoopRPC的理解

DataNode注册

private void connectToNNAndHandshake() throws IOException {
  // 重要方法1:获取NameNode的代理,是下面代码运行的基础
  // bpNamenode 也就是NameNodeRpcServer
  bpNamenode = dn.connectToNN(nnAddr);

  // 重要方法2:获取NameNode的元数据信息
  NamespaceInfo nsInfo = retrieveNamespaceInfo();
  
  // Verify that this matches the other NN in this HA pair.
  // This also initializes our block pool in the DN if we are
  // the first NN connection for this BP.
  bpos.verifyAndSetNamespaceInfo(nsInfo);
  
  // 重要方法3:注册
  register(nsInfo);
}

获取NameNode代理
RPC.getProtocolProxy方法直接拿到NameNode的代理,并赋值给bpNamenode

private static DatanodeProtocolPB createNamenode(
    InetSocketAddress nameNodeAddr, Configuration conf,
    UserGroupInformation ugi) throws IOException {
  return RPC.getProtocolProxy(DatanodeProtocolPB.class,
      RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
      conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class),
      org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
}

获取NameNode的元数据信息
元数据信息是存在NameNode上的,NameNode想拿到元数据信息就必须和NameNode通信,于是乎RPC登场了

@Override
public NamespaceInfo versionRequest() throws IOException {
  try {
    // 核心方法:rpcProxy.versionRequest
    return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
        VOID_VERSION_REQUEST).getInfo());
  } catch (ServiceException e) {
    throw ProtobufHelper.getRemoteException(e);
  }
}

这个时候rpcProxy.versionRequest是跳不进去了,因为是NameNode的代理,那么方法肯定在NameNode的RPC服务上,也就是NameNodeRpcServer这个类里面,直接到这个类中去搜索能查看到下面的代码

@Override // DatanodeProtocol, NamenodeProtocol
public NamespaceInfo versionRequest() throws IOException {
  // checkNameNode 是否启动  
  checkNNStartup();
  namesystem.checkSuperuserPrivilege();
  return namesystem.getNamespaceInfo();
}

返回的元数据信息是些什么,可以通过namesystem.getNamespaceInfo方法看看
代理也有了,接下来就DataNode就可以注册自己了

/**
 * Register one bp with the corresponding NameNode
 * <p>
 * The bpDatanode needs to register with the namenode on startup in order
 * 1) to report which storage it is serving now and 
 * 2) to receive a registrationID
 *  
 * issued by the namenode to recognize registered datanodes.
 * 
 * @param nsInfo current NamespaceInfo
 * @see FSNamesystem#(DatanodeRegistration)
 * @throws IOException
 */
void register(NamespaceInfo nsInfo) throws IOException {
  // The handshake() phase loaded the block pool storage
  // off disk - so update the bpRegistration object from that info
  bpRegistration = bpos.createRegistration();

  LOG.info(this + " beginning handshake with NN");

  while (shouldRun()) {
    try {
      // Use returned registration from namenode with updated fields
      // 核心代码:向NameNode注册自己
      bpRegistration = bpNamenode.registerDatanode(bpRegistration);
      bpRegistration.setNamespaceInfo(nsInfo);
      break;
    } catch(EOFException e) {  // namenode might have just restarted
      LOG.info("Problem connecting to server: " + nnAddr + " :"
          + e.getLocalizedMessage());
      sleepAndLogInterrupts(1000, "connecting to server");
    } catch(SocketTimeoutException e) {  // namenode is busy
      LOG.info("Problem connecting to server: " + nnAddr);
      sleepAndLogInterrupts(1000, "connecting to server");
    }
  }
}

继续往下

@Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
    ) throws IOException {
  RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
      .newBuilder().setRegistration(PBHelper.convert(registration));
  RegisterDatanodeResponseProto resp;
  try {
    // 核心代码:RPC注册DataNode  
    resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
  } catch (ServiceException se) {
    throw ProtobufHelper.getRemoteException(se);
  }
  return PBHelper.convert(resp.getRegistration());
}

又是代理,老办法,直接到NameNodeRpcServer中去找registerDatanode方法

@Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  // checkNameNode 是否启动        
  checkNNStartup();
  verifySoftwareVersion(nodeReg);
  namesystem.registerDatanode(nodeReg);
  return nodeReg;
}

registerDatanode方法很长,这里列出核心的方法

/**
 * Register the given datanode with the namenode. NB: the given
 * registration is mutated and given back to the datanode.
 *
 * @param nodeReg the datanode registration
 * @throws DisallowedDatanodeException if the registration request is
 *    denied because the datanode does not match includes/excludes
 * @throws UnresolvedTopologyException if the registration request is 
 *    denied because resolving datanode network location fails.
 */
public void registerDatanode(DatanodeRegistration nodeReg) {
    // register new datanode
    // 增加一个DataNode
    addDatanode(nodeDescr);
    // also treat the registration message as a heartbeat
    // no need to update its timestamp
    // because its is done when the descriptor is created
    // 同时创建DataNode的心跳信息
    heartbeatManager.addDatanode(nodeDescr);
}
/** Add a datanode. */
void addDatanode(final DatanodeDescriptor node) {
  // To keep host2DatanodeMap consistent with datanodeMap,
  // remove  from host2DatanodeMap the datanodeDescriptor removed
  // from datanodeMap before adding node to host2DatanodeMap.
  synchronized(datanodeMap) {
    host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
  }

  networktopology.add(node); // may throw InvalidTopologyException
  // DataNode信息加入
  host2DatanodeMap.add(node);
  checkIfClusterIsNowMultiRack(node);

  if (LOG.isDebugEnabled()) {
    LOG.debug(getClass().getSimpleName() + ".addDatanode: "
        + "node " + node + " is added to datanodeMap.");
  }
}

整个DataNode的注册就完成了,还是相对比较清晰的,其中省略了一些没(我)什(不)么(太)用(会)的或者相对不是那(我)么(懒)重(的)要(看)的东西,接下来就是另一个核心的DataNode是如何和NameNode保持心跳信息的

DataNode心跳

/**
 * Main loop for each BP thread. Run until shutdown,
 * forever calling remote NameNode functions.
 */
private void offerService() throws Exception {
  // 相同的while(true)方法
  while (shouldRun()) {
    try {
      final long startTime = monotonicNow();

      // 当前时间 - 上次心跳 >= 3s
      if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {

        lastHeartbeat = startTime;
        if (!dn.areHeartbeatsDisabledForTests()) {
          // 发送心跳
          HeartbeatResponse resp = sendHeartBeat();
          
          // 心跳的同时,获取到的一些NameNode的指令,NameNode不会对DataNode直接下达指令,而是通过心跳来传命令
          if (!processCommand(resp.getCommands()))
            continue;
        }
      }
} 

发送心跳信息代码

HeartbeatResponse sendHeartBeat() throws IOException {
  StorageReport[] reports =
      dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
  
  VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
      .getVolumeFailureSummary();
  int numFailedVolumes = volumeFailureSummary != null ?
      volumeFailureSummary.getFailedStorageLocations().length : 0;
   // 重要代码,包含了心跳的哪些信息   
  return bpNamenode.sendHeartbeat(bpRegistration,
      reports,
      dn.getFSDataset().getCacheCapacity(),
      dn.getFSDataset().getCacheUsed(),
      dn.getXmitsInProgress(),
      dn.getXceiverCount(),
      numFailedVolumes,
      volumeFailureSummary);
}

从代码中可以看出来,心跳信息包括了容量的数据、坏的块数据等,这就是为什么WEB界面有每个DataNode的相关信息了(心跳带过去的),同样的,这里也是调用的NameNodeRpcServer的方法

@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
    int xmitsInProgress, int xceiverCount,
    int failedVolumes, VolumeFailureSummary volumeFailureSummary)
    throws IOException {
  checkNNStartup();
  verifyRequest(nodeReg);
   // 处理DataNode发送的心跳信息
  return namesystem.handleHeartbeat(nodeReg, report,
      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
      failedVolumes, volumeFailureSummary);
}

接着往下走

/**
 * The given node has reported in.  This method should:
 * 1) Record the heartbeat, so the datanode isn't timed out
 * 2) Adjust usage stats for future block allocation
 * 
 * If a substantial amount of time passed since the last datanode 
 * heartbeat then request an immediate block report.  
 * 
 * @return an array of datanode commands 
 * @throws IOException
 */
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int xmitsInProgress, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) throws IOException {
  readLock();
  try {
    //get datanode commands
    final int maxTransfer = blockManager.getMaxReplicationStreams()
        - xmitsInProgress;
    // 通过DataNodeManagers处理心跳
    DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
        nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
        xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
    
    // 返回指令
    return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
  } finally {
    readUnlock();
  }
}

进入handleHeartbeat方法

/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, final String blockPoolId,
    long cacheCapacity, long cacheUsed, int xceiverCount, 
    int maxTransfers, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) throws IOException {
  synchronized (heartbeatManager) {
    synchronized (datanodeMap) {
      DatanodeDescriptor nodeinfo = null;
      try {
        // 获取DataNode信息
        nodeinfo = getDatanode(nodeReg);
      } catch(UnregisteredNodeException e) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
      }

      if (nodeinfo == null || !nodeinfo.isAlive) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
      }

      // 重要方法:更新心跳信息
      heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                       cacheCapacity, cacheUsed,
                                       xceiverCount, failedVolumes,
                                       volumeFailureSummary);

      // If we are in safemode, do not send back any recovery / replication
      // requests. Don't even drain the existing queue of work.
      if(namesystem.isInSafeMode()) {
        return new DatanodeCommand[0];
      }
      if (blocks != null) {
        BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
            blocks.length);
        // 返回指令信息
        return new DatanodeCommand[] { brCommand };
      }
    }
  }
  // 返回指令信息
  return new DatanodeCommand[0];
}

继续往下heartbeatManager.updateHeartbeat方法

synchronized void updateHeartbeat(final DatanodeDescriptor node,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {
  stats.subtract(node);
  // 核心方法
  node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
    xceiverCount, failedVolumes, volumeFailureSummary);
  stats.add(node);
}

接着来updateHeartbeat方法

/**
 * Updates stats from datanode heartbeat.
 */
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  // 核心方法
  updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  heartbeatedSinceRegistration = true;
}
/**
 * process datanode heartbeat or stats initialization.
 */
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  long totalCapacity = 0;
  long totalRemaining = 0;
  long totalBlockPoolUsed = 0;
  long totalDfsUsed = 0;
  Set<DatanodeStorageInfo> failedStorageInfos = null;
  
  // 更改DataNOde存储信息
  setCacheCapacity(cacheCapacity);
  setCacheUsed(cacheUsed);
  setXceiverCount(xceiverCount);

  // 修改最后一次的心跳时间
  setLastUpdate(Time.now());
}

代码已经到头了,但是到这儿还没有心跳的判断方法,但是这个时间肯定有用,回过头来,在NameNode启动的过程中,有一个核心的线程

/** 
 * Start services common to both active and standby states
 */
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
  
    // 启动重要服务(心跳监控)
    blockManager.activate(conf);
}

不清楚怎么来的可以看看上一篇NameNode的启动流程

public void activate(Configuration conf) {
  pendingReplications.start();
  // 心跳管理服务
  datanodeManager.activate(conf);
  this.replicationThread.start();
}
void activate(final Configuration conf) {
  decomManager.activate(conf);
  // 管理心跳
  heartbeatManager.activate(conf);
}
void activate(Configuration conf) {
  heartbeatThread.start();
}

这里出现了一个线程,找到run方法

/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
  private long lastHeartbeatCheck;
  private long lastBlockKeyUpdate;

  @Override
  public void run() {
    while(namesystem.isRunning()) {
      try {
        final long now = Time.monotonicNow();
        // 每5秒检查一次
        if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
          // 心跳检查
          heartbeatCheck();
          lastHeartbeatCheck = now;
        }
        if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
          synchronized(HeartbeatManager.this) {
            for(DatanodeDescriptor d : datanodes) {
              d.needKeyUpdate = true;
            }
          }
          lastBlockKeyUpdate = now;
        }
      } catch (Exception e) {
        LOG.error("Exception while checking heartbeat", e);
      }
      try {
        Thread.sleep(5000);  // 5 seconds
      } catch (InterruptedException ie) {
      }
    }
  }
}

heartbeatCheck

/**
 * Check if there are any expired heartbeats, and if so,
 * whether any blocks have to be re-replicated.
 * While removing dead datanodes, make sure that only one datanode is marked
 * dead at a time within the synchronized section. Otherwise, a cascading
 * effect causes more datanodes to be declared dead.
 * Check if there are any failed storage and if so,
 * Remove all the blocks on the storage. It also covers the following less
 * common scenarios. After DatanodeStorage is marked FAILED, it is still
 * possible to receive IBR for this storage.
 * 1) DN could deliver IBR for failed storage due to its implementation.
 *    a) DN queues a pending IBR request.
 *    b) The storage of the block fails.
 *    c) DN first sends HB, NN will mark the storage FAILED.
 *    d) DN then sends the pending IBR request.
 * 2) SBN processes block request from pendingDNMessages.
 *    It is possible to have messages in pendingDNMessages that refer
 *    to some failed storage.
 *    a) SBN receives a IBR and put it in pendingDNMessages.
 *    b) The storage of the block fails.
 *    c) Edit log replay get the IBR from pendingDNMessages.
 * Alternatively, we can resolve these scenarios with the following approaches.
 * A. Make sure DN don't deliver IBR for failed storage.
 * B. Remove all blocks in PendingDataNodeMessages for the failed storage
 *    when we remove all blocks from BlocksMap for that storage.
 */
void heartbeatCheck() {
  final DatanodeManager dm = blockManager.getDatanodeManager();
  // It's OK to check safe mode w/o taking the lock here, we re-check
  // for safe mode after taking the lock before removing a datanode.
  if (namesystem.isInStartupSafeMode()) {
    return;
  }
  boolean allAlive = false;
  while (!allAlive) {
    // locate the first dead node.
    DatanodeID dead = null;

    // locate the first failed storage that isn't on a dead node.
    DatanodeStorageInfo failedStorage = null;

    // check the number of stale nodes
    int numOfStaleNodes = 0;
    int numOfStaleStorages = 0;
    synchronized(this) {
      // 遍历所有的DataNode
      for (DatanodeDescriptor d : datanodes) {
        // 如何判断DN是dead
        if (dead == null && dm.isDatanodeDead(d)) {
          stats.incrExpiredHeartbeats();
          dead = d;
        }
      }
    }
  }
}

这个方法里面就比较明显了,先遍历所有的DataNode,然后进行dead的判断

/** Is the datanode dead? */
// 灵魂拷问 ⬆️
boolean isDatanodeDead(DatanodeDescriptor node) {
  /**
   * heartbeatExpireInterval = 630s
   * this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval(5*60*1000)
   *        + 10 * 1000 * heartbeatIntervalSeconds(3);
   * 即超过2 * 5m + 3 * 10s = 10m30s(630s)这个时间的DN视作dead
   */
  // 不难看出,如果DN的心跳超过630s,则视为dead 
  return (node.getLastUpdateMonotonic() <
          (monotonicNow() - heartbeatExpireInterval));
}

具体的属性默认值可以找找,我这边找出来的数值就是630s,这个时间决定DN是不是dead

DN注册流程和心跳流程图
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。