NOTE:本文要求读者对spark的运行原理有基本的了解。
需要明确的一点是,abort不同于fail。如果一个stage fail了,那么它还有可能被resubmit,然后重新执行。而如果一个stage abort了,则将无法再次执行。
首先,为什么要abort一个stage?
显然,如果spark认为即使执行该stage,也会以失败告终,那么就没必要继续执行了。此时,就会abort该stage。
其次,什么时候abort一个stage?
主要有两种情况:一、在stage提交tasks过程中,发生了错误;二、 该stage多次执行失败。
具体的有:
- 如果没有一个active的job需要该stage,则abort:
abortStage(stage, "No active job for stage " + stage.id, None)
- 在该stage的submitMissingTasks过程中:
a) 如果tasks创建失败,则abort:
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
b) 如果tasks没有序列化,则abort:
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
c) 如果tasks序列化失败,则abort:
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
- 如果该stage中的某个task的失败次数超过阈值,则abort该task所在的TaskSetManager,进而导致该stage abort:
if (numFailures(index) >= maxTaskFailures) {
logError("Task %d in stage %s failed %d times; aborting job".format(
index, taskSet.id, maxTaskFailures))
abort("Task %d in stage %s failed %d times, most recent failure: %s\nDriver stacktrace:"
.format(index, taskSet.id, maxTaskFailures, failureReason), failureException)
return
}
- 如果一个stage因FetchFailed连续失败次数超过阈值,则abort:
val shouldAbortStage = failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts
if (shouldAbortStage) {
s"""$failedStage (${failedStage.name})
|has failed the maximum allowable number of
|times: $maxConsecutiveStageAttempts.
|Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
}
abortStage(failedStage, abortMessage, None)
What's more:
当然,还有很多TaskSchedulerImpl
在处理tasks过程中产生的异常,引起的TaskSetManager
的abort,进而导
致stage的abort的情况。在此不再赘述。个人觉得,主要的还是关注stage因多次执行失败,而造成的abort比较重要。
最后,abort一个stage会有什么影响?
显然,一个stage abort,意味着那些依赖于该stage的active jobs都会执行失败。所以,我们需要fail这些jobs。
而如果我们要fail一个job,那么,也会fail掉(并标记为结束)该job中的所有stages。但是,事实上,并不是一个job中的所有stages都会被fail。这里有个条件:如果该stage只被这一个job所依赖,才能fail掉该stage。因为如果有多个jobs依赖一个stage,而如果我们fail掉了该stage,就会导致其他需要使用该stage的jobs因为该stage的fail而失败。
这里有个问题,如果需要被abort的stage被多个jobs所依赖,那么根据上述的条件,该stage最终就不能真正的abort了吗?
其实不是这样的。
假设我们有3个jobs(jobA,jobB, jobC)依赖了该需要被abort的stage0。当jobA尝试fail掉它所有依赖的stages时,发现stage0同时被其它2个jobs所依赖,于是放弃fail该stage,转而检查其它的stages。
虽然,最终,jobA没有fail掉所有依赖的stages,比如stage0。但它依然会fail掉自己。当jobA fail的时候,就会清理相关的数据结构。比如,依赖stage0的jobs就只剩下jobB和jobC了。
等到jobB fail它所依赖的stages的时候,发现stage0同时被另一个job所依赖,于是也放弃fail stage0。当jobB fail时,清理完相关数据结构。此时,依赖stage0的job就只剩下jobC了。
那么,等到jobC fail它所依赖的stages的时候,发现stage0此时只有一个job依赖,那就是jobC自己。既然我jobC就要fail了,那么留着stage0显然也没有什么用了。如果该stage0正在running,显然是一种资源的浪费。所以这种情况下,还要kill掉其中正在运行的tasks。最后,jobC才fail掉自己,并再次清理相关数据结构。
最终,stage0也就被fail掉了。