架构简要介绍在前几片博客提过了
Spark的消息通信原理
通信模块类图
首先看一下Spark的消息通信的类图
最核心的是左上角的虚线框的四个类
首先定义了RpcEnvFactory,RpcEnv两个抽象类,在RpcEnv中定义的是RPC通信框架的启动停止和关闭等抽象方法,在RpcEnvFactoty中定义的是创建的抽象方法。然后下面两个类分别使用Netty对继承的方法进行了实现,分别是NettyRpcEnvFactory和NettyRpcEnv。
在NettyRpcEnv中启动终端点的方法为setupEndpoint,在此方法中会将RpcEndPoint,RpcEndpointRef相互以键值对的形式存放在线程安全的ConcurrentHashMap中。
RpcEnv的object提供反射的方式来创建该类的一个静态实例。
在各模块进行通信的时候,需要调用这些类。比如Master,Worker等。会先使用RpcEnv的静态方法创建RpcEnv实例,然后实例化Master,其中Master是ThreadSafeRpcEndPoint的一个子类,所以创建的Master是一个线程安全的实例。接着通过先前创建的RpcEnv的实例来调用启动终端点方法,把Master终端点和其对于的Ref注册到RpcEnv中去,这一步是由方法setupEndpoint来实现的。
在通信的时候,其他对象获得了Master的终端店的Ref便可以发送消息给Master节点。
Spark启动时通信
意思就是启动过程中的通信,主要是Master和Worker节点之间的通信。
详细过程如下:
Master启动后,随后启动各个Worker节点,Worker启动时和Master启动时流程类似,首先创建通信环境RpcEnv的静态对象,创建一个Worker EndPoint,随后就于Master通信,向Master发送一个注册Worker的消息RegisterWorker。
一个Worker可能回注册到多个Master中去,这时候需要在Worker的tryRegisterAllMasters中创建注册线程池registerMasterThreadPool。然后启动注册线程去注册,每个注册过程:先获取Master的一个终端点EndPoint的引用然后调用registerWithMaster方法。Master收到注册消息,开始对Worker的消息进行验证和记录。注册成功则返回给Worker一个RegisteredWorker消息,否则发送RegisterWorkerFailed消息,worker收到消息后打印出错日志并结束Worker启动。
验证过程:Master维持一张注册列表。接收到Worker的RegisterWorker消息后,Master首先对自己的状态进行验证,如果自己处于StandBy状态则忽略Worker的消息。否则在注册表中查询该Worker的节点,若找到了,则返回RegisterWorkerFailed给Worker,否则调用registerWorker方法把该Worker加入注册表中。若Worker收到成功注册的消息,先记录日志并更新Master消息,则之后会提供定时调度进程发送心跳信息Heartbeat给Master。
Spark运行时消息通信
运行时存在几个对象:
客户端端点 client EndPoint,客户端的驱动程序 Driver,SparkContext,以及Worker中的Executor和worker endpoint,Master的端点Master endpoint。
这些对象之间的协作如下:
1.首先客户端执行应用程序会先创建一个Spark Context,Spark Context在启动过程中先实例化一个ScahdulerBackend对象,在standalone时是创建的SparkDeploySchedulerBackend对象,该对象是Driver EndPoint的子类,在该对象启动过程中又会创建Client Endpoint对象。
此时客户端的对象都创建结束。
之后Client EndPoint对象会有一个tryRegisterAllMasters方法,就和启动时通信中Worker持有的一样,Client EndPoint 向 Master发送注册应用的请求。
Master收到client请求,在registerApplication方法中记录应用信息并把该应用加入到等待运行的应用列表中去。同时调用startExecutorOnWorker方法运行应用,在方法首先获取满足条件的worker节点(内存满足启动Executor所需,核数大于1),最后向该worker发送 LaunchExecutor消息。
Worker收到Master请求后。第一步通过SPARK_EXECUTOR_DIRS环境变量创建Executor的执行目录当程序执行结束之后由Worker剔除。第二步实例化一个Executor对象,在该对象启动中会创建一个进程生成器ProcessBuilder,然后该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象是Executor运行的容器。生成完成后,第三步,worker向Master返回ExecutorStateChanged消息告知Master,Executor容器创建完成。
3中创建的CoarseGrainedExecutorBackend对象的启动方法onStart会向DriverEndPoint发送Executor的注册消息。
Driver EndPoint收到了注册消息RegisterExecutor之后会先判断8Executor*是否已经被注册过了,若是则返回注册失败消息,否则返回注册成功消息,并Driver终端记录该Executor并在makeOffers()中给该Executor分配运行任务资源,发后发生LaunchTask消息。
CoarseGrainedExecutorBackend对象收到注册成功消息后,在CoarseGrainedExecutorBackend容器中实例化Executor对象,启动完毕后定时向Driver发送心跳信息,等待从DriverEndPoint终端点发送执行任务的消息。
CoarseGrainedExecutorBackend对象接受LaunchTask消息后,调用Executor的launchTask,在执行时创建TaskRunner进程。处理完毕后发送StatusUpdate消息返回给CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend对象将StatusUpdate返回给DriverEndPoint,DriverEndPoint收到消息调用taskSchedulerImpl的statusUpdate*。