概要
上篇文章Spark RPC概述介绍了Spark RPC的实现思路,有了上述基础,我们看一个具体例子,就是standalone模式下的Master和Worker,主要关注Worker如何向Master注册信息以及保持心跳。
Master
Master是standalone模式下的主节点,查看定义
Master继承ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint继承RpcEndpoint(不了解RpcEndpoint的同学点击这里Spark RPC概述)。RpcEndpoint的生命周期又是: onStart -> receive(receiveAndReply)* -> onStop,所以我们理所当然的关心Master的这四个对应方法。
onStart
启动基于jetty的webUI。
-
启动定时任务,默认每60s send CheckForWorkerTimeOut信息给Master(如上图注释处),检测Worker连接情况。根据上篇文章,send方法不需要返回值,信息发送给 Master的receive方法,receive使用scala模式匹配处理信息,如下
上面几段代码,可以清晰的看出Master根据条件lastHeartbeat < currentTime - WORKER_TIMEOUT_MS判断Worker是否还在发送心跳,如果过期将其从对应集合中删除。lastHeartbeat是Worker最后一次连接的时间,后面讲心跳(Heartbeat)时会讲到。
如果配置spark.master.rest.enabled=true,启动rest Server。
启动度量系统,persistenceEngine,leaderElectionAgent分别是关于master recovery和leader选举。
receive
receive方法接收EndpointRef send方法发送的信息,如上面说到的定时检查Worker状态,定义如下
我将源码简单整理,如上图所示,这篇文章只关心第一个部分即检测Worker状态和心跳。
Worker状态的检测上面刚分析过。
-
心跳机制
处理Worker发过来的信息,如果worker信息已经存在则更新lastHeartbeat(在onStart方法中提到),否则重新连接master,也就是注册,下面receiveAndReply方法会讲到。
receiveAndReply
receiveAndReply方法接收EndpointRef ask及其衍生方法发送的信息,定义如下
这部分我们只关心和Worker相关的RegisterWorker,注册Worker信息到Master,查看具体逻辑
判断Master的状态是不是standby以及WorkerId是否已存在。
-
调用registerWorker注册worker信息,查看registerWorker方法
移除dead和unknown worker,然后注册,所谓的注册就是将worker信息添加到workers、idToWorker、addressToWorker三个集合容器中。
接下来是recovery相关,然后返回成功或失败信息给Worker。
onStop
关闭webUI等各种服务,截图省略。
Main方法
最后看下Master的启动,main方法
-
接收参数,包括rpc的host、port(底层netty使用)和webUI server的port,默认如下
startRpcEnvAndEndpoint方法,初始化RpcEnv,注册Master(RpcEndpoint)到RpcEnv,到这里,上面介绍的生命周期onStart -> receive(receiveAndReply)* -> onStop方法开始工作。
启动流程如下,具体请参考Spark RPC之Netty启动。
总结
结合上一篇文章Spark RPC概述讲解了Master的实现及启动过程,Master是RpcEndpoint的具体实现。其中主要关注了三点
1. Master定时检测Worker连接情况(CheckForWorkerTimeOut)
2. Worker信息注册(RegisterWorker)
3. 心跳(Heartbeat)
Master 60秒发送一次消息 给自己,遍历检查所有 worker 是否在 60秒内有发送心跳给 Master,如果有 worker 60秒内没有发送心跳,则从worker列表中 把这个 work 移除。
work 向Master 发送消息注册信息,Master 接收后 检测这个 worker 是否是 dead 和 UnKnow 的,如果是则移除重新添加,没有也添加到。
worker 每隔 Send a heartbeat every (heartbeat timeout) / 4 milliseconds
默认 15秒发送一次心跳到 Master。
上述 Master 和 Worker 通过RPC 通信。
接下来我们看下Worker的实现以及如何向Master注册和发送心跳。
参考:https://blog.csdn.net/u011564172/article/details/56670364