NameNode启动流程
基于Hadoop 2.7源码,可以配合源码看这篇文章效果更好
在浏览前先带着几个问题
- NameNode fsImag文件合并
- NameNode 如何通信
- WEB界面怎么来的
- 哪些条件会出发HDFS的安全模式
找到NameNode类,直接开始源码走读
/**
*/
public static void main(String argv[]) throws Exception {
// 创建NN
NameNode namenode = createNameNode(argv, null);
}
createNameNode方法太长,我们直接跳到最后一个case,new NameNode()方法中去
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
// 初始化
initialize(conf);
}
几个核心方法所在的地方
- startHttpServer - WEB界面
- loadNamesystem - 文件合并
- createRpcServer - NNRpcServer
- startCommonServices - 其他核心方法
/**
* Initialize name-node.
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
// 核心方法1:HttpServer
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 核心方法2:合并文件FsImages
loadNamesystem(conf);
// 核心方法3:NN的Rpc Server
rpcServer = createRpcServer(conf);
// 核心方法4:启动公共服务
startCommonServices(conf);
}
方法startHttpServer
private void startHttpServer(final Configuration conf) throws IOException {
// WEB 端口50070的由来
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}
进入start方法
/**
* @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
* for information related to the different configuration options and
* Http Policy is decided.
*/
void start() throws IOException {
// HttpServer构建
HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
httpAddr, httpsAddr, "hdfs",
DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
httpServer = builder.build();
// 增加Servlet
setupServlets(httpServer, conf);
}
看看setupServlets这个方法
private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
httpServer.addInternalServlet("startupProgress",
StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
httpServer.addInternalServlet("getDelegationToken",
GetDelegationTokenServlet.PATH_SPEC,
GetDelegationTokenServlet.class, true);
httpServer.addInternalServlet("renewDelegationToken",
RenewDelegationTokenServlet.PATH_SPEC,
RenewDelegationTokenServlet.class, true);
httpServer.addInternalServlet("cancelDelegationToken",
CancelDelegationTokenServlet.PATH_SPEC,
CancelDelegationTokenServlet.class, true);
httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
true);
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",
FileDataServlet.class, false);
httpServer.addInternalServlet("checksum", "/fileChecksum/*",
FileChecksumServlets.RedirectServlet.class, false);
httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
ContentSummaryServlet.class, false);
}
增加了这么多的Servlet,我们找一个listPaths到web看看,怎么看呢,在浏览器直接输入
http://hdfs:50070/listPaths/user/hive
HttpServer的大致流程图
结果如下(我格式化一下),看数据就能得出,其他的Servlet可以一个一个试试看
<?xml version="1.0" encoding="UTF-8"?>
<listing filter=".*" path="/user/hive" exclude="" time="2020-10-22T08:41:43+0000" version="2.6.0-cdh5.14.4" recursive="no">
<directory path="/user/hive" modified="2019-03-04T05:03:51+0000" accesstime="1970-01-01T00:00:00+0000" permission="drwxrwxr-t" owner="hive" group="hive"/>
<directory path="/user/hive/.staging" modified="2019-03-06T10:43:09+0000" accesstime="1970-01-01T00:00:00+0000" permission="drwx------" owner="hive" group="hive"/>
<file path="/user/hive/hive-udf" modified="2019-03-01T06:52:26+0000" accesstime="2020-10-21T07:42:08+0000" size="6246" replication="3" blocksize="134217728" permission="-rw-rw-r--" owner="hdfs" group="hive"/>
<directory path="/user/hive/warehouse" modified="2020-08-25T09:12:27+0000" accesstime="1970-01-01T00:00:00+0000" permission="drwxrwxrwt" owner="hive" group="hive"/>
</listing>
我们回到第二个核心方法
loadNamesystem方法
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
long loadStart = monotonicNow();
try {
// 加载fsImage
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
return namesystem;
}
继续下一步,这个方法就大致展示了fsImage的合并流程,由于是刚刚启动,我们只简单过一下里面的大致步骤,后面会再回过头来看这个方法
- 步骤1:合并元数据 fsImage + editLog = new fsImage
- 步骤2:fsImage存到磁盘
- 步骤3:打开一个新的editLog来写日志
private void loadFSImage(StartupOption startOpt) throws IOException {
// 步骤1:合并元数据 fsImage + editLog = new fsImage
fsImage.recoverTransitionRead(startOpt, this, recovery);
// 步骤2:fsImage存到磁盘
fsImage.saveNamespace(this);
// 步骤3:打开一个新的editLog来写日志
fsImage.openEditLogForWrite();
}
接下来看另外的核心方法(需要了解Hadoop RPC的使用)
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
// serviceRpcServer代码
this.serviceRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
// clientRpcServer代码
this.clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService).setBindAddress(bindHost)
.setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
}
- serviceRpcServer
用于处理DataNode发送的RPC请求 - clientRpcServer
用于处理Client发送的RPC请求
NameNode在这儿就启动了2个Rpc Server
Rpc Server流程图
还剩下最后一个方法
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
namesystem.startCommonServices(conf, haContext);
}
里面有3个很重要的方法
/**
* Start services common to both active and standby states
*/
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
// 检查资源
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
// HDFS的安全模式
setBlockTotal();
// 启动重要服务(心跳检测进程)
blockManager.activate(conf);
}
先看看NameNodeResourceChecker构造方法
/**
* Create a NameNodeResourceChecker, which will check the edits dirs and any
* additional dirs to check set in <code>conf</code>.
*/
public NameNodeResourceChecker(Configuration conf) throws IOException {
this.conf = conf;
volumes = new HashMap<String, CheckedVolume>();
// 剩余磁盘的大小阀值
duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
Collection<URI> localEditDirs = Collections2.filter(
FSNamesystem.getNamespaceEditsDirs(conf),
new Predicate<URI>() {
@Override
public boolean apply(URI input) {
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
return true;
}
return false;
}
});
// Add all the local edits dirs, marking some as required if they are
// configured as such.
// 添加需要监控的磁盘
// localEditDirs => hdfs-site.xml core-site.xml
for (URI editsDirToCheck : localEditDirs) {
addDirToCheck(editsDirToCheck,
FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
editsDirToCheck));
}
// All extra checked volumes are marked "required"
for (URI extraDirToCheck : extraCheckedVolumes) {
addDirToCheck(extraDirToCheck, true);
}
minimumRedundantVolumes = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
}
addDirToCheck方法也看看,主要是一个volumes对象
/**
* Add the volume of the passed-in directory to the list of volumes to check.
* If <code>required</code> is true, and this volume is already present, but
* is marked redundant, it will be marked required. If the volume is already
* present but marked required then this method is a no-op.
*
* @param directoryToCheck
* The directory whose volume will be checked for available space.
*/
private void addDirToCheck(URI directoryToCheck, boolean required)
throws IOException {
File dir = new File(directoryToCheck.getPath());
if (!dir.exists()) {
throw new IOException("Missing directory "+dir.getAbsolutePath());
}
// 一个目录就是一个CheckedVolume
CheckedVolume newVolume = new CheckedVolume(dir, required);
// volume -> 就是多个需要检查的路径
CheckedVolume volume = volumes.get(newVolume.getVolume());
if (volume == null || !volume.isRequired()) {
volumes.put(newVolume.getVolume(), newVolume);
}
}
接着之前的继续看checkAvailableResource方法
/**
* Perform resource checks and cache the results.
*/
void checkAvailableResources() {
Preconditions.checkState(nnResourceChecker != null,
"nnResourceChecker not initialized");
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}
继续走,这块需要注意一下之前被赋值的volumes
/**
* Return true if disk space is available on at least one of the configured
* redundant volumes, and all of the configured required volumes.
*
* @return True if the configured amount of disk space is available on at
* least one redundant volume and all of the required volumes, false
* otherwise.
*/
public boolean hasAvailableDiskSpace() {
// volumes 存放的就是需要检查的目录
return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
minimumRedundantVolumes);
}
继续走
/**
* Return true if and only if there are sufficient NN
* resources to continue logging edits.
*
* @param resources the collection of resources to check.
* @param minimumRedundantResources the minimum number of redundant resources
* required to continue operation.
* @return true if and only if there are sufficient NN resources to
* continue logging edits.
*/
static boolean areResourcesAvailable(
Collection<? extends CheckableNameNodeResource> resources,
int minimumRedundantResources) {
int requiredResourceCount = 0;
int redundantResourceCount = 0;
int disabledRedundantResourceCount = 0;
// 遍历每一个目录
for (CheckableNameNodeResource resource : resources) {
if (!resource.isRequired()) {
redundantResourceCount++;
// 判断磁盘资源是否充足
if (!resource.isResourceAvailable()) {
disabledRedundantResourceCount++;
}
} else {
requiredResourceCount++;
// 判断磁盘资源是否充足
if (!resource.isResourceAvailable()) {
// Short circuit - a required resource is not available.
return false;
}
}
}
if (redundantResourceCount == 0) {
// If there are no redundant resources, return true if there are any
// required resources available.
return requiredResourceCount > 0;
} else {
return redundantResourceCount - disabledRedundantResourceCount >=
minimumRedundantResources;
}
}
看看isResourceAvailable这个方法,这是个接口,所以找到对应的实现类NameNodeResourceChecker
@Override
public boolean isResourceAvailable() {
// 获取当前目录可用的大小
long availableSpace = df.getAvailable();
if (LOG.isDebugEnabled()) {
LOG.debug("Space available on volume '" + volume + "' is "
+ availableSpace);
}
// 如果空间小于duReserved(还记得这个值吗,之前有初始化,默认值100M)
if (availableSpace < duReserved) {
LOG.warn("Space available on volume '" + volume + "' is "
+ availableSpace +
", which is below the configured reserved amount " + duReserved);
return false;
} else {
return true;
}
}
这里有一些计算了,如果剩余可用空间不足100M,就返回false,回到调用
/**
* Perform resource checks and cache the results.
*/
void checkAvailableResources() {
Preconditions.checkState(nnResourceChecker != null,
"nnResourceChecker not initialized");
// 最终的值就是false
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}
回到之前的地方,看下一个方法setBlockTotal
/**
* Set the total number of blocks in the system.
*/
public void setBlockTotal() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
// 设置安全模式
// getCompleteBlocksTotal()方法返回的值就是'可以正常使用的block数'
safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
这个地方需要先看看getCompleteBlocksTotal方法的值
/**
* Get the total number of COMPLETE blocks in the system.
* For safe mode only complete blocks are counted.
*
* HDFS中有2中block
* 1)COMPLETE:正常的可用的block
* 2)ConstructionBlocks:处于正在构建的block
*/
private long getCompleteBlocksTotal() {
// Calculate number of blocks under construction
long numUCBlocks = 0;
readLock();
// 获取正在构建的block数
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
try {
// 总的block数 - 正在构建的block数 = 正常的block数
return getBlocksTotal() - numUCBlocks;
} finally {
readUnlock();
}
}
接下来看看setBlockTotal方法
/**
* Set total number of blocks.
*/
private synchronized void setBlockTotal(int total) { // 假设一共有1000个可用的block
this.blockTotal = total;
// threshold = 0.999f
// 计算阀值 blockThreshold = 999 如果有1000个block,如果有999的block就正常
this.blockThreshold = (int) (blockTotal * threshold);
// 也是999
this.blockReplQueueThreshold =
(int) (blockTotal * replQueueThreshold);
if (haEnabled) {
// After we initialize the block count, any further namespace
// modifications done while in safe mode need to keep track
// of the number of total blocks in the system.
this.shouldIncrementallyTrackBlocks = true;
}
if(blockSafe < 0)
this.blockSafe = 0;
// 检查安全模式
checkMode();
}
进入核心方法
/**
* Check and trigger safe mode if needed.
*/
private void checkMode() {
// Have to have write-lock since leaving safemode initializes
// repl queues, which requires write lock
assert hasWriteLock();
if (inTransitionToActive()) {
return;
}
// if smmthread is already running, the block threshold must have been
// reached before, there is no need to enter the safe mode again
// 是否进入安全模式needEnter 核心方法
if (smmthread == null && needEnter()) {
// 进入安全模式
enter();
return;
}
}
needEnter方法判断HDFS是否需要进入安全模式
一共有3个条件
- threshold != 0 && blockSafe < blockThreshold
HDFS的元数据会有上一次关闭集群的总Block数量
假设之前有1000个complete的block,按照比例计算要求的block是999个
threshold = 0.999
blockThreshold = 999
blockSafe就是DataNode向NameNode汇报上来的所有block总数
如果满足所有汇报的block总数 < 阀值(999)就会进入安全模式 - datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold
datanodeThreshold是存活的ataNode存的阀值,默认值是0
这个条件也就是如果设置了ataNode存活的阀值,并且当前存活的DataNode小于阀值就会进入安全模式 - !nameNodeHasResourcesAvailable()
这个条件是检查了当时的磁盘可用大小,如果磁盘可用大小不足100M,则会进入安全模式
/**
* There is no need to enter safe mode
* if DFS is empty or {@link #threshold} == 0
*
* 一共有3个条件
* 1)threshold(0.999) != 0 && blockSafe(集群上报的block数量) < blockThreshold(999)
* 2)datanodeThreshold(需要或者的DataNode的数量,默认值0) != 0 && getNumLiveDataNodes(存储的DataNode的数量) < datanodeThreshold
* 3)!nameNodeHasResourcesAvailable(资源充足,在前面计算过,不充足(100M)返回false)
*/
private boolean needEnter() {
return (threshold != 0 && blockSafe < blockThreshold) ||
(datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
(!nameNodeHasResourcesAvailable());
}
blockSafe的值如何来的呢
/**
* Increment number of safe blocks if current block has
* reached minimal replication.
* @param replication current replication
*/
private synchronized void incrementSafeBlockCount(short replication) {
// DataNode向NameNode进行block report
if (replication == safeReplication) {
// 初始值是0,每个block都会调用这个,累计了所有DataNode完整的block的数量
this.blockSafe++;
// Report startup progress only if we haven't completed startup yet.
StartupProgress prog = NameNode.getStartupProgress();
if (prog.getStatus(Phase.SAFEMODE) != Status.COMPLETE) {
if (this.awaitingReportedBlocksCounter == null) {
this.awaitingReportedBlocksCounter = prog.getCounter(Phase.SAFEMODE,
STEP_AWAITING_REPORTED_BLOCKS);
}
this.awaitingReportedBlocksCounter.increment();
}
checkMode();
}
}
到这里NameNode的启动流程就告一段落了