本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。
SparkContext:Spark应用的入口
SparkContext是用户应用于Spark集群交互的主要接口,所以把SparkContext作为入口来展开executor的源码阅读,主要针对standaone模式下的executor模块。
SparkContext通过调用createTaskScheduler()方法来创建两个重要的类:TaskScheduler和SchedulerBackend
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
// 在DAGScheduler的构造中持有TaskScheduler的引用之后,开始TaskScheduler
_taskScheduler.start()
两个重要的类
TaskScheduler类:低级的task调度接口,仅有一个实现类为:TaskSchedulerImpl,这个类的作用是为高级task调度接口DAGScheduler划分好的stage分配TaskSet,然后提交给Spark集群,处理Task的运行消息,并将event返回给DAGScheduler,这里可以看出DAGScheduler实例化后持有了TaskSchedulerImpl的引用,有关DAGScheduler与TaskSchedulerImpl配合的调度机制,在后面的文章中展开。
SchedulerBackend类:调度的后台接口,实现类有很多,根据传入的master url采用模式匹配的方式来确定需要什么实现类,主要的作用是当有新的task或者资源变动时找到合适的executor来分配资源,或者是处理从TaskSchedulerImpl发出杀掉Task请求。
在standalone模式中,SchedulerBackend的具体实现类为:SparkDeploySchedulerBackend,通过以下createTaskScheduler()方法中的截选代码可以了解这个过程:
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
scheduler.initialize(backend)表明了TaskSchedulerImpl持有backend的引用,且在这个方法里初始化了用于FIFO和FAIR两种调度模式的容器和池,这部分放到调度模块展开。
至此为止,两个重要的类的实例已经构造完毕:TaskSchedulerImpl和SparkDeploySchedulerBackend
driverEndpoint和appClient的初始化
紧接着,调用了TaskSchedulerImpl的start()方法,在start()方法中首先调用了backend的start()方法
override def start() {
backend.start() //调用SchedulerBackend的start()方法
// 如果开启了推测执行功能的话,就开启一条speculation线程来计算,参数是通过配置文件的参数来传入,或者使用默认值
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
SparkDeploySchedulerBackend的start()方法首先调用了super的start()方法,这里需要说明的是SparkDeploySchedulerBackend并不是直接继承自SchedulerBackend,而是继承自CoarseGrainedSchedulerBackend,CoarseGrainedSchedulerBackend继承自SchedulerBackend
这样的话,最后其实调用的是CoarseGrainedSchedulerBackend的start()方法,代码如下:
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
start方法中注册了DriverEndpoint,调用createDriverEndpoint方法创建了一个DriverEndpoint的实例,至此DriverEndpoint创建完成,DriverEndpoint在实例化的过程中,会去调用生命周期中onstart方法,在onStart方法中会周期性的执行以下代码:Option(self).foreach(_.send(ReviveOffers))
即自己给自己发送ReviveOffers的消息,收到ReviveOffers消息后会调用makeOffers方法选出合适executor然后分配资源。
SparkDeploySchedulerBackend在start方法中,还创建了AppClient实例:
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
AppClient实例封装了关于application的一些信息ApplicationDescription,如appName,maxCores,executorMemory等
client.start()方法中注册了AppClient中的通信端ClientEndpoint
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
// 注册appClient的rpcEndpoint
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
在注册ClientEndpoint的过程中,调用其生命周期中的onstart方法
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
至此,DriverEndpoint和AppClient都已经实例化完成
DriverEndpoint已经准备好了,一旦有新的application提交或是集群的资源发生了变化,即调用makeoffers方法去分配资源;
AppClient在注册ClientEndpoint的过程中,将要调用registerWithMaster将application注册请求提交给Master。
registerWithMaster之后的剖析将会放在下一篇文章里继续深入。