Spark3.0系列(二)Master启动流程源码解析

问题

  • 在Spark的standalone 模式中Master是如何启动?
  • 在Master启动后又做了哪些操作?

Master启动流程

从Spark2.0开始就以及删除了akka的相关通信代码,所以3.0的rpc通讯采用的还是2.0之后的Netty来作为底层通讯框架;SparkMaster的启动类(org.apache.spark.deploy.master.Master.scala),在这个class中的main函数是启动Master的开始,我们就从这里开始。
主要启动流程如下:

  1. 设置多线程的异常处理器
  2. 获取和解析SparkMaster参数信息
  3. 启动RpcEnv
  4. 注册Endpoint
  5. 等待Master优雅关闭

1. 设置多线程的异常处理器

因在多线程中其异常无法准备获取,需要进行设置线程的异常处理器来进行处理.


设置异常处理器

2. 获取和解析SparkMaster的参数信息

参数解析

在SparkConf中主要从系统的环境变量中加载因spark开头的环境变量参数


SparkConf

解析的参数信息会存放在settings中。
而MasterArguments主要是用来解析Master启动时通过args传递的参数,以及Spark配置文件中的信息(默认为SPARK_HOME/conf/spark-default.conf文件):


MasterArguments

3. 启动RpcEnv

在Master的main中会调用startRpcEnvAndEndpoint,这个函数来负责启动Rpc并注册Endpoint(Master)


startRpcAndEndpoint

其中SecurityManager主要是SparkConf中获取是否启用了安全和权限信息等。


RpcEnv
主要创建RpcEnv的是采用抽象工厂模式来创建NettyRpcEnv的(之前还有akka,现在只剩下netty了)。
接下来看看NetyRpcEnv是如何创建的RpcEnv的
NettyRpcEnv

1、先new出NettyRpcEnv实例,在NettyRpcEnv进行构造函数初始化时,会初始化一些主要的类,如Dispatcher、TransportContext、NettyRpcHandler,以及outbox。
2、判断是否是客户端模式,非客户端模式需要启动服务器,主要用TransportContext来创建服务器


创建服务器
在创建服务器完成后,会注册一个验证的Endpoint(RpcEndpointVerifier),后面会用这个Endpoint来验证给定的EndpointName是否存在之类的操作。
在启动服务器时如果端口被占用,会按照算法进行算出下一个端口,如果是测试模式,最大重试100次,非测试模式,最大重试16次,如果重试次数结束都无法创建服务器,则报错,启动Master失败。

4.注册Endpoint(Master)

注册Endpoint主要使用RpcEnv#setupEndpoint方法来完成注册,这个方法内部又调用了Dispatcher的registerRpcEndpoint来进行记录Endpoint。当注册完成后会返回一个EndpointRef,后面可以用这个Ref来向Endpoint发送rpc请求。


RegisterMaster

接下来开始构造Master,当构造完成后会调用OnStart方法(在创建Dispatcher时,Dispatcher会初始化ShareMessageLoop,当注册Endpoint时会用到MessageLoop#register函数,这个函数会对每个注册的Endpoint都创建一个InBox,且在new InBox时,会向里面放OnStart消息作为Endpoint的InBox的第一个消息)
在onStart方法中,主要启动web ui和rest server以及一些指标系统;和leader选举等。


onStart

leaderElection

当master服务器启动后,会使用EndpointRef向Master发送BoundPortsResponse消息来获取Master启动的web ui端口和rest端口。
到此Master启动完成,awaitTermination等待Master关闭,这个是使用CountDownLatch来实现的。

后面就等待Worker来连接、Driver来提交作业。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。