Datanode 启动流程源码分析
本文尝试讲解 Datanode 的启动流程源码分析,内容包括
- Datanode 类注释
- Datanode 启动流程分析
- Datanode 注册流程分析
- Datanode 心跳流程分析
- Namenode 心跳监控分析
1. Datanode 类注释
DataNode is a class (and program) that stores a set of blocks for a DFS deployment.
A single deployment can have one or many DataNodes.
Each DataNode communicates regularly with a single NameNode.
It also communicates with client code and other DataNodes from time to time
DataNode 是一个类(进程),它为 DFS 存储block集合。
单个部署可以有一个或多个 DataNode.
每个 DataNode 有规律的与单个 NameNode 通信。
它也会不定时与客户端代码和其他的 DataNode 通信
DataNodes store a series of named blocks. The DataNode
allows client code to read these blocks, or to write new
block data. The DataNode may also, in response to instructions
from its NameNode, delete blocks or copy blocks to/from other
DataNodes.
DataNode 存储命名叫 block 的序列。DataNode 允许客户端代码读取这些 block,或者写新的 block 数据。
dataNode 可以相应来自 DataNode 的指令,删除块,或者拷贝块 到/从 其他 DataNode 上
The DataNode maintains just one critical table:
block-> stream of bytes (of BLOCK_SIZE or less)
This info is stored on a local disk. The DataNode
reports the table's contents to the NameNode upon startup
and every so often afterwards.
DataNode 包含一个表,block-> stream of bytes
这个信息是保存在本地磁盘,DataNode 会报告这个表的内容到 NameNode 在启动的时候,以后每隔一段时间一次
DataNodes spend their lives in an endless loop of asking
the NameNode for something to do. A NameNode cannot connect
to a DataNode directly; a NameNode simply returns values from
functions invoked by a DataNode.
DataNode 花费一生,无穷尽的循环着,问NameNode 我要做什么。
NameNode 不能直接连接 DataNode;
NameNode 只是在 DataNode 请求的函数中,简单的回复值
DataNodes maintain an open server socket so that client code
or other DataNodes can read/write data. The host/port for
this server is reported to the NameNode, which then sends that
information to clients or other DataNodes that might be interested.
DataNode 维护一个开放的服务socket,因此客户端代码或其他的DataNode可以读写数据。
服务的 host/port 会报告给 NameNode,发送信息到 可能感兴趣的 客户端或其他的 DataNode
DataNode 维护开放的服务器套接字,以便客户端代码或其他 DataNode 可以读写数据。
该服务器的主机/端口报告给NameNode,然后NameNode将该信息发送给可能感兴趣的客户端或其他 DataNode。
2. Datanode 启动流程分析
-
找到 main 方法,可以看到就一行有用的代码
secureMain(args, null);
,直接跟进
-
跟进后,发现这个行代码
DataNode datanode = createDataNode(args, null, resources);
,这个应就是创建一个Datanode,直接跟进去看看
-
可以看到
DataNode dn = instantiateDataNode(args, conf, resources);
,这个看着像是 实例化 datanode,再跟进
-
本方法的前几行都是些判断,直接看最后一行
makeInstance(dataLocations, conf, resources);
,再跟进
-
前面几个都是各种校验,先记录下,然后看最后
new DataNode(conf, locations, resources);
-
进入 Datanode 构造方法后,会发现代码比较多,不过前面都是赋值和判断,直接往下看,会看到一句代码就是
startDataNode(conf, dataDirs, resources);
,我们进入此方法看看;
-
startDataNode
方法又是很多代码,我们忽略前面的赋值和判断,直接往后看,然后我们发现这里有几行代码storage = new DataStorage();
initDataXceiver(conf);``startInfoServer(conf);
,这几行代码虽然看着不像 RPC,但是在这里出现这比较可疑,先简单标记下。然后在往后看就发现一行代码initIpcServer(conf);
,这个应就是RPC服务了,进去看看
-
果然在里面发现了
RPC.Builder
。
-
我们还发现,从始至终,我们跟踪代码就没离开 Datanode.java,所以 Datanode.java本身应就是RPC服务。我们可以证明下:我们发现 Datanode 实现了很3个接口
在看接口里又都有versionID
字段,这就证明Datanode就是RPC服务了
-
虽然证明 Datanode 是RPC服务,也找到了其构建的代码,但是 Datanode 在哪里启动的呢?我们可以搜索
ipcServer.start()
,因为 RPC服务就是通过 start 方法启动的。
-
我们发现是
runDatanodeDaemon()
调用了ipcServer.start()
,我们在找下是哪个方法runDatanodeDaemon()
,然后就发现,原来是createDataNode
方法调用了它,而createDataNode
方法不就是 第 3 步的代码吗
至此 Datanode 启动流程分析完成
简单记录下,Datanode启动过程中遇到的一些可疑的对象
- DataStorage
- DataXceiverServer
- HttpServer2
- IpcServer
3. Datanode 注册流程分析
第7步哪里是初始化 RPC 服务,后面还有很多代码,我们继续看看有没有什么重要的代码
-
我们在 RPC 服务初始化后面看到,又创建了一个
blockPollManager
,看下这个对象的注释,然后又调用了blockPoolManager.refreshNamenodes(config)
这个代码,这个代码看着不像什么重要代码,但其实这里是非常重要的,Datanode 的注册和心跳都在这里
-
跟进
refreshNamenodes
代码看看,发现重要的代码就是最后一行doRefreshNamenodes(newAddressMap);
-
跟进
doRefreshNamenodes(newAddressMap);
后发现这个代码不少,这个只能慢慢看了。还好有几个注释。看步骤1,步骤2说是检查和判断类的,就先过了。步骤3 是启动一个nameservices
服务,这个看着比较重要那就仔细看看。
-
仔细看看步骤3的代码,发现重要的也就是
for
循环了,这里应该是拿到每一个nameservice
的地址,然后去创建了一个BPOfferService
对象。那我们看看这个createBPOS(addrs);
代码。
-
这里就一行代码,继续跟进
-
这里又一个for循环,这个for循环的意思就是循环创建
BPServiceActor
对象然后添加到bpServices
列表中,然后就结束了。也就是说每个BPOfferService
对象有一个BPServiceActor
类型的列表;
-
那么
BPOfferService
与BPServiceActor
到底是什么呢
这里简单说下,我们都知道 Namenode 有HA 和 联邦,Namenode 联邦中对应有多个HA。其实这里的每个BPServiceActor
就对应一个 Namenode,而BPOfferService
对应的就是一个HA集群。在代码 4 中,最后把每个BPOfferService
又添加到了offerServices
列表中。然后又来了一个startAll()
方法。
-
startAll()
中又是一个循环,可以看出,这里其实就是调用了每个BPOfferService
对象的start
方法
-
然后跟进后,可以看到,最终是调用了
BPServiceActor
的start()
方法
-
继续跟进后,发现在
BPServiceActor
的start
方法中,new
了一个线程,然后又调用了线程的start
方法。 既然是调用了线程的start
,那么本质上就是调用了线程的run
方法。那么我们跟进run
方法。
-
搜索发现
run
方法,此方法就是个while
循环,不断执行connectToNNAndHandshake();
直到成功。 那么继续跟进connectToNNAndHandshake();
。
-
先看代码注释,这里首先是获得一个NN代理,然后又获得 namespace 信息,最后进行注册
-
跟进注册代码,然后可以看到又一个while循环,里面就是注册了。
-
再次跟进后看到一个RPC代理。那么既然是RPC代理,那我们就该找到它实际的代理协议是哪个了。
-
其实到这里直接看
NameNodeRpcServer
即可。为什么呢?我们看下此类的注释,大概意思就是该类会把对DatanodeProtocol
的请求发送到RPC服务器。而DatanodeProtocol
所对应的RPC服务器当然是NameNodeRpcServer
了。
-
进入
NameNodeRpcServer
后,找到registerDatanode
方法,可以猜到 核心代码就是namesystem.registerDatanode(nodeReg);
-
没什么可说的,继续跟进
-
进入
registerDatanode
后,发现代码很多。前面都是跟注册不太相关的,直接到后面看到addDatanode(nodeDescr);
。这里既是实际的注册了。
-
跟进
addDatanode(nodeDescr);
代码看看。可以看到,这里其实就是对各种集合赋值。然后就结束了。
到这里整个Datanode 注册的流程就结束了。我们总结下
4. Datanode 心跳流程分析
- 读Datanode心跳代码我们需要回到Datanode的BPServiceActor类的run方法。我们知道了
connectToNNAndHandshake();
方法是注册,那么在接下来的代码中有一个offerService()
,这个方法就是Datanode的心跳。
- 进入
offerService()
方法后,一下就看到注释说每隔一段时间,会发送心跳和阻塞报告。先看下if判断if (startTime - lastHeartbeat >= dnConf.heartBeatInterval)
这里就是判断时间间隔的,那么具体的时间间隔是多少呢?其实是3秒,具体的可以自己在此类中搜索heartBeatInterval
的赋值就可以看到。在往下看几行,就找到了HeartbeatResponse resp = sendHeartBeat();
这个应该就是发送心跳了。(HeartbeatResponse
里还包含Namenode返回的让Datanode执行的命令,这个最后再看)
-
好,我们再次进入查看。可以看到前面的几行代码就是获得一些报告信息和概述等,这些信息都被最后一行发送出去了,那就直接跟进最后一行。(这就是为什么我们能在Namenode的50070服务上能看到很多Datanode的信息,心跳的时候带了很多Datanode的信息过去)
- 这里可以看到刚开始获得一个builder,然后把这个 builder 传给
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
这个代码使用了。这里使用到了rpcProxy
。又是DatanodeProtocolClientSideTranslatorPB
这个类,这个类会把请求都转发给RPC服务端。在这里也就是NameNodeRpcServer
。
- 进入
NameNodeRpcServer
找到sendHeartbeat
方法。可以看看最后一行namesystem.handleHeartbeat
就是心跳的核心代码
- 进入
namesystem.handleHeartbeat
后,只有blockManager.getDatanodeManager().handleHeartbeat
像是处理心跳的代码。直接跟进。
- 在这里简单看下,第一个if应该是获得Datanode信息,第二个应该是检测Datanode是否死亡,第三个就是判断如果没有Datanode,或者Datanode已死那么直接返回个其他的命令。在这三个if 之后,终于看到了
heartbeatManager.updateHeartbeat
,很明显,这就是跟新心跳吧。
- 直接进入
heartbeatManager.updateHeartbeat
的代码。又发现一个Update,直接跟进
-
还是一个update,在跟
- 虽然这里的代码比较多,但是仔细一看,前面大部分都是一些检查。直到出现了几个setXXX的方法。可以看到有一个
setLastUpdate(Time.now());
方法。看着就像是把当前时间设置为最后更新时间一样。
- 跟进一下,就看到这里其实就是一个普通的setter方法,这个是给
lastUpdate
属性赋值。
-
后面的代码其实就是更新Datanode报上来的报告
总结
5. Namenode 监控 Datanode 心跳流程分析
-
之前在讲 Namenode 安全模式的代码时,其实后面还有一个重要的代码就是,Namenode启动了几个重要的线程。我们跟进去看看。
- 可以看到有
datanodeManager.activate(conf);
- 我们跟进
heartbeatManager.activate(conf)
看看。
-
看到调用了线程的start方法,肯定有要看Run方法了
- 可以看到每5秒进行一次
heartbeatCheck();
(时间间隔在配置文件里)
-
heartbeatCheck();
每个5秒遍历Datanode,判断最后心跳时间是否超时,(超时时间是10分30秒,具体的看配置),如果超时了,那么下面就会进行一些移除datanode信息的操作,如果没有超时的,等待5秒再检查一次。