Scheduler模块主要负责stage的划分,以及job的调度及submit。是整个spark计算流程中比较重要的部分。
�1.从saveAsTextFile开始
��阅读该部分代码可以从任意一个action方法开始,例如saveAsTextFile方法:
一路跟下去直到PairRDDFunctions.saveAsHadoopDataset中有一行self.context.runJob(self,writeToFile),如下:
该方法中入参writeToFile是用来定义stage执行逻辑的函数(要注意的是scala里函数和方法是有区别的),该函数运用了closure特性在实际运行过程中不断的针对stage的各个partition信息(不同的partition输出位置等属性不同)来重新初始化自由变量writer。
而Utils.tryWithSafeFinallyAndFailureCallbacks方法则是一个被curry化的函数,封装了固定的异常处理机制。
因为当前action是saveTextAsHadoopFile操作,所以该函数的功能就是将当前拿到的partition的数据写入到指定路径。
2.DAGScheduler.submitJob
然后看runJob方法,从这里一路跟踪到DAGScheduler.submitJob方法(在跟踪过程中会看到对func序列化的操作),然后会看到这里:
Spark的计算框架是基于Event队列机制运作的,诸如job的submit、cancel,Excecutor的添加、丢失等操作。当需要执行某操作时,会向操作对应的EventLoop中发送Event,该Event会被添加至Queue中,然后顺序处理。如下为DAGSchedulerEventProcessLoop的消息处理逻辑:
3.DAGScheduler.handleJobSubmitted
点进去看到DAGScheduler. handleJobSubmitted方法。
在该方法中,将当前正在运行的job添加至active中,发送SparkListenerJobStart事件,用来监控Job处理的进度,在UI界面上展示。
4.DAGScheduler. submitStage
然后看submitStage方法,该方法是用来提交spark job的,提交时会从整个DAG图的最后一个stage开始进行,逐个查找其parent stage,直到找不到未执行的parent stage后再开始执行当前递归查找到的stage中的tasks。在stage查找其parent stages的过程中,会更新stage的状态变更为waiting、running、failed。下面具体分析一下:
首先系统会判断入参stage是否目前为止还未被调度过(分为因parent stage missing而等待、执行中、执行失败三种状态)。这里的missing应该是指未被系统检测到也就是待计算的意思。因为查找是从DAG图的最后一个stage开始的,在查找开始前其parent都是missing的。
如果是则通过getMissingParentStages获取其missing的parent stage。getMissingParentStages稍后再解释。
如果stage没有parent,则说明当前DAG分支已经找到了source,这时候直接提交stage的task即可。提交task的方法为submitMissingTasks。
如果找到了parent,则依次将其parent全部提交,然后递归查找其parent的parent stage,同时将当前stage添加到waitingStages,直到job完成或失败后,stage会被-=掉,参考(DAGScheduler. markMapStageJobAsFinished方法)。
submitStages方法执行后,会调用submitWaitingStages方法,将当前waiting的stages按照升序提交一下。
5.DAGScheduler. getMissingParentStages
下面看一下getMissingParentStages方法:
该方法用来查找一个stage的parent stage的,也就是我们所说的划分stage的逻辑。因为stage是由rdd组成,划分stage是基于rdd之间的依赖关系是否为shuffleDependency(宽依赖)来决定的。代码中可以看到,如果是宽依赖则构建一个stage,如果是窄依赖,则继续向上查找。
在该方法中构建了一个stack waitingForVisit,用来存储当前迭代到的不是shuffleDependency的rdd。
将当前stage的rdd push到这个stack中,调用visit方法判断其Dependency类型。Visit方法首先会判断这个rdd是否已经计算完毕,判断依据为该rdd的partition是否都有了输出。
如果没有计算完成,则判断其Dependency类型,如果为宽依赖,则封装一个stage添加到missing中。如果未窄依赖,则获取该窄依赖的rdd,将其push到stack中,待下一次继续查找。
直到最终返回missing。