1. 内部维护的重要结构
// 后续会介绍, 这是维护的metric相关句柄
private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
// 前文提到过的和job相关的结构
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
// 几个map映射表, 将job到stage, shuffle到stage, job到具体的执行过程也就是activeJob进行映射
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
/**
* Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids
* and its values are arrays indexed by partition numbers. Each array value is the set of
* locations where that RDD partition is cached.
*
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
// every task. When we detect a node failing, we note the current epoch number and failed
// executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
//
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
// A closure serializer that we reuse.
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
// 这里实现了从DAG的蓝图到taskScheduler具体任务的下刷. 后边会具体介绍
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
对于EventProcessLoop后边会详细介绍, 这里可以认为这是一个产品经理把任务安排到jira上去执行的过程.从蓝图到执行
2. submitJob / runJob
SparkContext把一个任务放到EventPostLoop里, 去实际执行
这里外层调用时runJob, runJob内会实际执行submitJob方法来跑任务
SparkContxt的runJob是在RDD里面启动的, 每当RDD执行一个action, 比如
rdd.aggragate
其内部代码就会调用sc.runJob()
这就是我们前文中说过的RDD的所有Action会触发DAGScheduler开始执行任务
这里有一个非常重要的调优点, 就是尽可能避免对RDD执行forEach
操作. 有些Lamda写习惯了的程序员会直接用这个关键字, 这个关键字底层是一个Action
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded =>
// 正确处理
case JobFailed(exception: Exception) =>
// 错误处理
}
}
- 检查被执行的
rdd
内部的parition是否是合法的 - 增加nextJobId
- 如果rdd里没有parition, 说明这job没在跑, 直接返回
- 有parition的话, 就把jobid, 对应的parition, rdd和具体的操作扔给EventProcessLoop, 并等待结果以JobWaiter的形式返回.
3. EventProcessJob
从上文我们可以看到从RDD操作, 到SparkContext submit job到最后运行.
任务需求, 可以理解为客户最终需求, 进入了一个叫EventProcessJob的结构里.
这是一个DAGScheduler内部维护的任务队列, 同样是以监听器模型来设计的.
这里熟悉Python Celery 或者 Apache Storm的朋友可以更容易理解这里DAG干的事情. 当最终任务进入这个管道后, DAG会根据最终需求和RDD里保存的依赖关系一步一步向前反推各种需要执行的任务. 每一步都放到这个管道里. 这个思路非常类似传统流处理系统的pipeline task.
类似客户最终要产品D, 然后D放到任务列表里, 然后在公司内部喊话: "抓紧把D的前置任务C给我放进去". 重复这个过程, 直到发现没有任何前置任务的"A", 或者恰好已经做过的"B". 无论如何, 经过一轮一轮的喊话, 所有需要执行的任务都被压入到上文提到的Stage/Job结构中. 后续从头开始执行就行了.
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 当有一个JOB被提交进来后, 进行处理
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
// 一个MapStage提交进来后进行处理
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
// 用于特殊情况-失败情况 处理的
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
// 用于特殊情况-失败情况 处理的
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
// 用于特殊情况-失败情况 处理的
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
// 用于特殊情况-失败情况 处理的
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
// 把可以跑任务的Executor加入到任务规划里来
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
// 干掉那些删库跑路的
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
// 获取计算结果, 一般是为了获取当前任务依赖的上一级任务的中间结果
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
// 任务失败
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
// 错误场景恢复
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
4. 从RDD的dependency到一个DAG图
- 从上文中我可以看到submitJob后从EventBus里启动
JobSubmitted
- 从上文中我们知道所有的submitJob是从 RDD Action开始的, 所有的Action会导致一个stage生成. 可以看到一个方法newResultStage
private[scheduler] def handleJobSubmitted(
... ) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
}
- 于是我们创建了一个stage, 赋予它stageId, 然后往这个stage里填东西
/**
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
... ): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
// 生成一个stage, stageid是全局递增量
stageIdToStage(id) = stage
// 往这个stage里填job
updateJobIdStageIdMaps(jobId, stage)
stage
}
- 继而跟踪代码, 我们发现程序试图在创建好这个stage后观察一下它依赖的所有资源是否到位了. 这里调用了
getParentStages
方法来实现新的stage.
从这里我们也可以看到JOB内部会切分成多个stage
这个在spark的web ui上也可以看到
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually mai ntaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
// 从RDD的依赖图中一层一层向上找, 所有shuffle操作会触发一个单独的stage的生成
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, firstJobId)
// 非shuffle操作则直接被压入到任务列表里
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
- 这个过程在DAGScheduler中还有几个实现的线路, 之后有机会再补充 [TODO]
- RDD的shuffle操作会变成stage和stage之间的间隔, 每个shuffle操作都会触发一个新的stage的生成.
- 而非shuffle操作, 则会被压入到工作队列里. 实际上就形成了这个stage以及和它关联的job