DataNode启动流程
基于Hadoop 2.7源码,可以配合源码看这篇文章效果更好
找到入口(DataNode的main方法)
public static void main(String args[]) {
// 入口
secureMain(args, null);
}
进入secureMain方法
public static void secureMain(String args[], SecureResources resources) {
// 顾名思义,创建DataNode
DataNode datanode = createDataNode(args, null, resources);
}
进入instantiateDataNode方法
/** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
@VisibleForTesting
@InterfaceAudience.Private
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
// 初始化DataNode
DataNode dn = instantiateDataNode(args, conf, resources);
}
进入makeInstance方法
/** Instantiate a single datanode object, along with its secure resources.
* This must be run by invoking{@link DataNode#runDatanodeDaemon()}
* subsequently.
*/
public static DataNode instantiateDataNode(String args [], Configuration conf,
SecureResources resources) throws IOException {
// 创建实例
return makeInstance(dataLocations, conf, resources);
}
进入DataNode方法
/**
* Make an instance of DataNode after ensuring that at least one of the
* given data directories (and their parent directories, if necessary)
* can be created.
* @param dataDirs List of directories, where the new DataNode instance should
* keep its files.
* @param conf Configuration instance to use.
* @param resources Secure resources needed to run under Kerberos
* @return DataNode instance for given list of data dirs and conf, or null if
* no directory from this directory list can be created.
* @throws IOException
*/
static DataNode makeInstance(Collection<StorageLocation> dataDirs,
Configuration conf, SecureResources resources) throws IOException {
// 新对象
return new DataNode(conf, locations, resources);
}
进入DataNode构造函数
/**
* Create the DataNode given a configuration, an array of dataDirs,
* and a namenode proxy
*/
DataNode(final Configuration conf,
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
// 启动
startDataNode(conf, dataDirs, resources);
}
进入startDataNode方法
其中有几个重要的方法,我拿出了几个我觉得重要的会在下面说明
/**
* This method starts the data node with the specified conf.
*
* @param conf - the configuration
* if conf's CONFIG_PROPERTY_SIMULATED property is set
* then a simulated storage based data node is created.
*
* @param dataDirs - only for a non-simulated storage data node
* @throws IOException
*/
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
SecureResources resources
) throws IOException {
// 重要方法1
registerMXBean();
// 重要方法2
initDataXceiver(conf);
// 重要方法3 HTTP Server
startInfoServer(conf);
// 重要方法4 RPC Server
initIpcServer(conf);
// 创建了BlockManager,正常情况下一个集群只有一个BlockPool(联邦除外)
blockPoolManager = new BlockPoolManager(this);
// 重要方法5 注册和心跳
blockPoolManager.refreshNamenodes(conf);
}
5个核心方法
- registerMXBean();
- initDataXceiver(conf);
- startInfoServer(conf) - HTTP Server(数据)
- initIpcServer(conf) - RPC Server(通信)
- refreshNamenodes(conf) - 注册和心跳
HTTP Server
这部分代码没什么难的,DataNode也有自己的HttpServer,里面也加了一些Servlet,有兴趣的可以通过addInternalServlet方法中的类的doGet方法具体实现
/**
* @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
* for information related to the different configuration options and
* Http Policy is decided.
*/
private void startInfoServer(Configuration conf)
throws IOException {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
// HttpServer初始化
HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("datanode")
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.addEndpoint(URI.create("http://localhost:0"))
.setFindPort(true);
// Http Server启动
this.infoServer = builder.build();
// 新增Servlet
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
FileChecksumServlets.GetServlet.class);
}
RPC Server
RCP也是理解Hadoop源码的一个重要前提,如果不理解可以先看看RPC的简单应用
private void initIpcServer(Configuration conf) throws IOException {
// 构建了一个DataNode的RPC(此处有设计模式))
ipcServer = new RPC.Builder(conf)
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
.setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build()
}
}
DataNode也启动了一个RPC,是为了干什么,后面用到再提,先记住DataNode也有自己的RPCServer(通信)
注册和心跳
这部分是DataNode的核心之一,也是所有分布式系统的基石
private void doRefreshNamenodes(
Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
assert Thread.holdsLock(refreshNamenodesLock);
Set<String> toRefresh = Sets.newLinkedHashSet();
Set<String> toAdd = Sets.newLinkedHashSet();
Set<String> toRemove;
synchronized (this) {
// Step 1. For each of the new nameservices, figure out whether
// it's an update of the set of NNs for an existing NS,
// or an entirely new nameservice.
for (String nameserviceId : addrMap.keySet()) {
if (bpByNameserviceId.containsKey(nameserviceId)) {
toRefresh.add(nameserviceId);
} else {
toAdd.add(nameserviceId);
}
}
// Step 2. Any nameservices we currently have but are no longer present
// need to be removed.
toRemove = Sets.newHashSet(Sets.difference(
bpByNameserviceId.keySet(), addrMap.keySet()));
assert toRefresh.size() + toAdd.size() ==
addrMap.size() :
"toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
" toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
" toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
// Step 3. Start new nameservices
if (!toAdd.isEmpty()) {
LOG.info("Starting BPOfferServices for nameservices: " +
Joiner.on(",").useForNull("<default>").join(toAdd));
for (String nsToAdd : toAdd) {
ArrayList<InetSocketAddress> addrs =
Lists.newArrayList(addrMap.get(nsToAdd).values());
BPOfferService bpos = createBPOS(addrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
}
startAll();
}
前面都是一些conf的解析,我们直接进入重要方法startAll
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// 关键类(BPOfferService),每一个联邦都会创建一个 BPOfferService
for (BPOfferService bpos : offerServices) {
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
接着看看bpos.start方法
void start() {
// BPServiceActor:有多少个NameNode都会创建多少个BPServiceActor
for (BPServiceActor actor : bpServices) {
// 每个NameNode都会start,啥意思呢,active和standby
actor.start();
}
}
再进入start方法
//This must be called only by BPOfferService
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {
//Thread is started already
return;
}
bpThread = new Thread(this, formatThreadName());
bpThread.setDaemon(true); // needed for JUnit testing
bpThread.start();
}
这个地方是一个线程启动,直接寻找run方法
/**
* No matter what kind of exception we get, keep retrying to offerService().
* That's the loop that connects to the NameNode and provides basic DataNode
* functionality.
*
* Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
* happen either at shutdown or due to refreshNamenodes.
*/
@Override
public void run() {
LOG.info(this + " starting to offer service");
try {
while (true) {
// init stuff
try {
// setup storage
// 重要方法1:获取元数据信息并注册自己,非常重要的方法
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
runningState = RunningState.RUNNING;
while (shouldRun()) {
try {
// 重要方法2:心跳
offerService();
} catch (Exception ex) {
LOG.error("Exception in BPOfferService for " + this, ex);
sleepAndLogInterrupts(5000, "offering service");
}
}
runningState = RunningState.EXITED;
} catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally {
LOG.warn("Ending block pool service for: " + this);
cleanUp();
}
}
这里有一块代码可以分析一下
while (true) {
// init stuff
try {
// setup storage
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
// Initial handshake, storage recovery or registration failed
runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
}
connectToNNAndHandshake中的代码是DataNode向NameNode获取元数据信息和注册自己的方法,是一个很重要的方法,这个代码可以简单理解为,即合理使用While(true)来尽可能保证重要代码的运行
while (true) {
try {
// 核心代码
connectToNNAndHandshake();
break;
} catch (IOException ioe) {
......
}
}
2个大核心方法,在接下来的源码分析需要一个前提,就是HadoopRPC的理解
DataNode注册
private void connectToNNAndHandshake() throws IOException {
// 重要方法1:获取NameNode的代理,是下面代码运行的基础
// bpNamenode 也就是NameNodeRpcServer
bpNamenode = dn.connectToNN(nnAddr);
// 重要方法2:获取NameNode的元数据信息
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(nsInfo);
// 重要方法3:注册
register(nsInfo);
}
获取NameNode代理
RPC.getProtocolProxy方法直接拿到NameNode的代理,并赋值给bpNamenode
private static DatanodeProtocolPB createNamenode(
InetSocketAddress nameNodeAddr, Configuration conf,
UserGroupInformation ugi) throws IOException {
return RPC.getProtocolProxy(DatanodeProtocolPB.class,
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class),
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
}
获取NameNode的元数据信息
元数据信息是存在NameNode上的,NameNode想拿到元数据信息就必须和NameNode通信,于是乎RPC登场了
@Override
public NamespaceInfo versionRequest() throws IOException {
try {
// 核心方法:rpcProxy.versionRequest
return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
VOID_VERSION_REQUEST).getInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
这个时候rpcProxy.versionRequest是跳不进去了,因为是NameNode的代理,那么方法肯定在NameNode的RPC服务上,也就是NameNodeRpcServer这个类里面,直接到这个类中去搜索能查看到下面的代码
@Override // DatanodeProtocol, NamenodeProtocol
public NamespaceInfo versionRequest() throws IOException {
// checkNameNode 是否启动
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.getNamespaceInfo();
}
返回的元数据信息是些什么,可以通过namesystem.getNamespaceInfo方法看看
代理也有了,接下来就DataNode就可以注册自己了
/**
* Register one bp with the corresponding NameNode
* <p>
* The bpDatanode needs to register with the namenode on startup in order
* 1) to report which storage it is serving now and
* 2) to receive a registrationID
*
* issued by the namenode to recognize registered datanodes.
*
* @param nsInfo current NamespaceInfo
* @see FSNamesystem#(DatanodeRegistration)
* @throws IOException
*/
void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
bpRegistration = bpos.createRegistration();
LOG.info(this + " beginning handshake with NN");
while (shouldRun()) {
try {
// Use returned registration from namenode with updated fields
// 核心代码:向NameNode注册自己
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
bpRegistration.setNamespaceInfo(nsInfo);
break;
} catch(EOFException e) { // namenode might have just restarted
LOG.info("Problem connecting to server: " + nnAddr + " :"
+ e.getLocalizedMessage());
sleepAndLogInterrupts(1000, "connecting to server");
} catch(SocketTimeoutException e) { // namenode is busy
LOG.info("Problem connecting to server: " + nnAddr);
sleepAndLogInterrupts(1000, "connecting to server");
}
}
}
继续往下
@Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
) throws IOException {
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration));
RegisterDatanodeResponseProto resp;
try {
// 核心代码:RPC注册DataNode
resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getRegistration());
}
又是代理,老办法,直接到NameNodeRpcServer中去找registerDatanode方法
@Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
// checkNameNode 是否启动
checkNNStartup();
verifySoftwareVersion(nodeReg);
namesystem.registerDatanode(nodeReg);
return nodeReg;
}
registerDatanode方法很长,这里列出核心的方法
/**
* Register the given datanode with the namenode. NB: the given
* registration is mutated and given back to the datanode.
*
* @param nodeReg the datanode registration
* @throws DisallowedDatanodeException if the registration request is
* denied because the datanode does not match includes/excludes
* @throws UnresolvedTopologyException if the registration request is
* denied because resolving datanode network location fails.
*/
public void registerDatanode(DatanodeRegistration nodeReg) {
// register new datanode
// 增加一个DataNode
addDatanode(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
// 同时创建DataNode的心跳信息
heartbeatManager.addDatanode(nodeDescr);
}
/** Add a datanode. */
void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
synchronized(datanodeMap) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
networktopology.add(node); // may throw InvalidTopologyException
// DataNode信息加入
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
+ "node " + node + " is added to datanodeMap.");
}
}
整个DataNode的注册就完成了,还是相对比较清晰的,其中省略了一些没(我)什(不)么(太)用(会)的或者相对不是那(我)么(懒)重(的)要(看)的东西,接下来就是另一个核心的DataNode是如何和NameNode保持心跳信息的
DataNode心跳
/**
* Main loop for each BP thread. Run until shutdown,
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
// 相同的while(true)方法
while (shouldRun()) {
try {
final long startTime = monotonicNow();
// 当前时间 - 上次心跳 >= 3s
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
lastHeartbeat = startTime;
if (!dn.areHeartbeatsDisabledForTests()) {
// 发送心跳
HeartbeatResponse resp = sendHeartBeat();
// 心跳的同时,获取到的一些NameNode的指令,NameNode不会对DataNode直接下达指令,而是通过心跳来传命令
if (!processCommand(resp.getCommands()))
continue;
}
}
}
发送心跳信息代码
HeartbeatResponse sendHeartBeat() throws IOException {
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
// 重要代码,包含了心跳的哪些信息
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
}
从代码中可以看出来,心跳信息包括了容量的数据、坏的块数据等,这就是为什么WEB界面有每个DataNode的相关信息了(心跳带过去的),同样的,这里也是调用的NameNodeRpcServer的方法
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
// 处理DataNode发送的心跳信息
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary);
}
接着往下走
/**
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
* 2) Adjust usage stats for future block allocation
*
* If a substantial amount of time passed since the last datanode
* heartbeat then request an immediate block report.
*
* @return an array of datanode commands
* @throws IOException
*/
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
readLock();
try {
//get datanode commands
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
// 通过DataNodeManagers处理心跳
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
// 返回指令
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
} finally {
readUnlock();
}
}
进入handleHeartbeat方法
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
// 获取DataNode信息
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// 重要方法:更新心跳信息
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if(namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
// 返回指令信息
return new DatanodeCommand[] { brCommand };
}
}
}
// 返回指令信息
return new DatanodeCommand[0];
}
继续往下heartbeatManager.updateHeartbeat方法
synchronized void updateHeartbeat(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
// 核心方法
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
}
接着来updateHeartbeat方法
/**
* Updates stats from datanode heartbeat.
*/
public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
// 核心方法
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
heartbeatedSinceRegistration = true;
}
/**
* process datanode heartbeat or stats initialization.
*/
public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalBlockPoolUsed = 0;
long totalDfsUsed = 0;
Set<DatanodeStorageInfo> failedStorageInfos = null;
// 更改DataNOde存储信息
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
// 修改最后一次的心跳时间
setLastUpdate(Time.now());
}
代码已经到头了,但是到这儿还没有心跳的判断方法,但是这个时间肯定有用,回过头来,在NameNode启动的过程中,有一个核心的线程
/**
* Start services common to both active and standby states
*/
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
// 启动重要服务(心跳监控)
blockManager.activate(conf);
}
不清楚怎么来的可以看看上一篇NameNode的启动流程
public void activate(Configuration conf) {
pendingReplications.start();
// 心跳管理服务
datanodeManager.activate(conf);
this.replicationThread.start();
}
void activate(final Configuration conf) {
decomManager.activate(conf);
// 管理心跳
heartbeatManager.activate(conf);
}
void activate(Configuration conf) {
heartbeatThread.start();
}
这里出现了一个线程,找到run方法
/** Periodically check heartbeat and update block key */
private class Monitor implements Runnable {
private long lastHeartbeatCheck;
private long lastBlockKeyUpdate;
@Override
public void run() {
while(namesystem.isRunning()) {
try {
final long now = Time.monotonicNow();
// 每5秒检查一次
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
// 心跳检查
heartbeatCheck();
lastHeartbeatCheck = now;
}
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
}
}
lastBlockKeyUpdate = now;
}
} catch (Exception e) {
LOG.error("Exception while checking heartbeat", e);
}
try {
Thread.sleep(5000); // 5 seconds
} catch (InterruptedException ie) {
}
}
}
}
heartbeatCheck
/**
* Check if there are any expired heartbeats, and if so,
* whether any blocks have to be re-replicated.
* While removing dead datanodes, make sure that only one datanode is marked
* dead at a time within the synchronized section. Otherwise, a cascading
* effect causes more datanodes to be declared dead.
* Check if there are any failed storage and if so,
* Remove all the blocks on the storage. It also covers the following less
* common scenarios. After DatanodeStorage is marked FAILED, it is still
* possible to receive IBR for this storage.
* 1) DN could deliver IBR for failed storage due to its implementation.
* a) DN queues a pending IBR request.
* b) The storage of the block fails.
* c) DN first sends HB, NN will mark the storage FAILED.
* d) DN then sends the pending IBR request.
* 2) SBN processes block request from pendingDNMessages.
* It is possible to have messages in pendingDNMessages that refer
* to some failed storage.
* a) SBN receives a IBR and put it in pendingDNMessages.
* b) The storage of the block fails.
* c) Edit log replay get the IBR from pendingDNMessages.
* Alternatively, we can resolve these scenarios with the following approaches.
* A. Make sure DN don't deliver IBR for failed storage.
* B. Remove all blocks in PendingDataNodeMessages for the failed storage
* when we remove all blocks from BlocksMap for that storage.
*/
void heartbeatCheck() {
final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (namesystem.isInStartupSafeMode()) {
return;
}
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
DatanodeID dead = null;
// locate the first failed storage that isn't on a dead node.
DatanodeStorageInfo failedStorage = null;
// check the number of stale nodes
int numOfStaleNodes = 0;
int numOfStaleStorages = 0;
synchronized(this) {
// 遍历所有的DataNode
for (DatanodeDescriptor d : datanodes) {
// 如何判断DN是dead
if (dead == null && dm.isDatanodeDead(d)) {
stats.incrExpiredHeartbeats();
dead = d;
}
}
}
}
}
这个方法里面就比较明显了,先遍历所有的DataNode,然后进行dead的判断
/** Is the datanode dead? */
// 灵魂拷问 ⬆️
boolean isDatanodeDead(DatanodeDescriptor node) {
/**
* heartbeatExpireInterval = 630s
* this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval(5*60*1000)
* + 10 * 1000 * heartbeatIntervalSeconds(3);
* 即超过2 * 5m + 3 * 10s = 10m30s(630s)这个时间的DN视作dead
*/
// 不难看出,如果DN的心跳超过630s,则视为dead
return (node.getLastUpdateMonotonic() <
(monotonicNow() - heartbeatExpireInterval));
}
具体的属性默认值可以找找,我这边找出来的数值就是630s,这个时间决定DN是不是dead