Master 和 Worker关系图

总结
- master:通过读取配置,创建actorSystem,反射调用master,master启动后,执行生命周期方法,
preStart和receiveWithLogging,定时val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000清理失去心跳的Worker - worker:通过读取配置,加载worker所在服务器的cpu cores,memory大小等信息,创建actorSystem,反射调用worker,worker启动后执行生命周期方法
preStart和receiveWithLogging,向master注册信息,最重要的信息worker的cpu cores和memory资源大小,定时向master报心跳val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4,防止被master清理 - 所以master会保存worker各个节点的资源信息,与保持心跳,作为后续执行job资源分配,调度的基础

Spark中start-all.sh脚本


Master
1.查看master启动脚本start-master.sh
start-master.sh脚本中可以看到master启动的时候,启动的是org.apache.spark.deploy.master.Master类,所以要看源码,从这个类查看,在从Master伴生对象main方法入手

2. 源码分析
main方法主要做了以下三件事
- 读取配置
- 创建
ActorSystem - 通过
ActorSystem启动Master服务
image.png
流程1.加载配置文件 2.启动master
val args = new MasterArguments(argStrings, conf)这句代码的功能就是加载配置文件,但是里面有可以借鉴Utils工具类的代码
image.png
image.png
关键点在val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf),主要作用,调用创建了ActorSystem
image.png
startService函数作为Utils.startServiceOnPort(port, startService, conf, name)的参数,
image.png
Utils.startServiceOnPort(port, startService, conf, name)中只是计算出master启动的端口
image.png
所以关键还是要看startService方法,该方法又调用doCreateActorSystem
image.png
所以第一个红框的作用就是读取配置,包括端口信息,创建ActorSystem,第二个红框,通过反射启动Master
image.png
启动Master,Master会走Actor的生命周期方法preStart启动,receiveWithLogging,接收信息
preStart方法中,启动webUi等操作,最重要的是这句代码,代码,启动一个定时器,定时发送给自己一个case objec CheckForWorkerTimeOut,间隔是val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
image.png
Master中最最重要的方法,receiveWithLogging,master启动后,通过该方法接收message做相应的处理,首先查看preStart中,查看定时发CheckForWorkerTimeOut给自己的receive调用的方法,查看源码,
总结:Master启动后,定时发送CheckForWorkerTimeOut,给自己,在receiveWithLogging,调用timeOutDeadWorkers,定时清理超过心跳时间的Worker,从val workers = new HashSet[WorkerInfo]移除
image.png
image.png
Worker
1.查看worker启动脚本start-slave.sh
start-slaves.sh启动start-slave.sh,启动org.apache.spark.deploy.worker.Worker类


2.源码分析
Worker启动跟Master启动几乎一模一样,
- 读取配置,获取
cpu cores和`memeory - 创建
ActorSystem - 反射创建
Worker,Worker启动,调用生命周期方法
image.png
image.png

所以直接看Worker的preStart跟receiveWithLogging
preStart方法中,会创建工作目录WorkDir,启动WorkWebUi,最最重要的是,向master注册,registerWithMaster查看方法,调用tryRegisterAllMasters,获取master uri 比如master:7070,获取master的actor,然后向master发送异步无返回值message,将自己的信息封装到case class RegisterWorker,包括自己的id,ip,port, cpu cores,内存大小信息等,所以此时需要到master的receiveWithLogging查看接收到的RegisterWorker做出什么样的操作



master接收到worker的信息后,将RegisterWorker 的信息封装成一个WorkerInfo(拥有worker的信息,id,ip,port, cpu cores,内存大小信息等),再将workerinfo的信息添加到persistenceEngine持久化起来,然后向worker发送RegisteredWorker,告诉worker注册成功,接着调用调度方法schedule(),这个方法大概是这样的,master可能拥有许多client提交的任务,当资源不足的时候,任务会排队,所以当有新的资源,就是worker加入的时候,如果此时有任务排队,又有资源加入master会调度任务分配资源,就是这个schedule()方法。woker收到注册成功的信息RegisteredWorker,所以此时需要去worker的receiveWithLogging中查看

worker接收到master的信息后,启动定时器,定时
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4向自己发送心跳SendHeartbeat,此时需要在worker的receiveWithLogging方法中查看SendHeartbeat,查看代码,又发送heartBeat给master

master收到心跳后,判断是否存在workerId,如果存在则更新workerInfo的心跳时间,如果不存在,发送信息
ReconnectWorker,让worker重新向注册。












