Spark Stage

  • 概念

A stage is a set of parallel tasks ① all computing the same function that need to run as part of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the DAGScheduler runs these stages in topological order.
Each Stage can
② either be a shuffle map stage
, in which case its tasks' results are input for other stage(s), or a result stage, in which case its tasks directly compute a Spark action (e.g. count(), save(), etc) by running a function on an RDD. ③For shuffle map stages, we also track the nodes that each output partition is on.
Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO scheduling is used, this ④ allows Stages from earlier jobs to be computed first or recoveredfaster on failure.Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In thatcase, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.The latest one will be accessible through latestInfo.

  • 代码解读

  1. [DAGScheduler]->private[scheduler] def handleJobSubmitted
{
   var finalStage: ResultStage = null
    try {
    /**
        ②stage 的类型只有两种,一种是shuffle map stage 另一种是result 
        stage,并且result stage 一定是调用action操作的RDD所在
        的stage,参数含义:func-对每个分区进行的操作根据action的不同
        而不同,例如action为count的时候那么func就是计算每个分区的大小,
        最终结果由jobwaiter(在SubmitJob方法中有涉及)搜集并计算将func
        的结果进行相加返回。
    **/
      finalStage = newResultStage(finalRDD, func, partitions, jobId,   
                    callSite)
    } catch {
     case e: Exception => logWarning("Creating new stage failed due to     
     exception - job: " + jobId,   e) listener.jobFailed(e) return 
    } 
                                  . . . 
     /**
        [1]首次提交的一定是finalStage即resultStage,然后会递归
        寻找该Stage的依赖直到找到一个没有依赖的Stage才会生
        成taskSet进行提交
        submitStage(finalStage)
        [2]在递归寻找依赖stage的过程中如果发现当前stage有依
        赖则将当前stage放入等待队列中以便后续调度
     **/
     submitWaitingStages()
}
  1. [DAGScheduler]->private def submitStage(stage: Stage)
{
                                      ...
       //[1]
       val missing = **getMissingParentStages(stage)**.sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents”)
        //[1]
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
         //[2]
          waitingStages += stage
        }
      ...
}
  1. [DAGScheduler]->getMissingParentStages(stage: Stage): List[Stage]
     根据dependency是否是shuffle dependency(wild or narrow)来进行stage划分
{
                                       . . . 

  for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                [2]
                waitingForVisit.push(narrowDep.rdd)
            }
          }
                                        . . .
}

ShuffleMapStage

  • 概念

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.⑤They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, ⑥they save map output files that can later be fetched by reduce tasks.The shuffleDep field describes the shuffle each stage is part of,and ⑧variables like outputLocs and numAvailableOutputs track how many map outputs are ready.ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage. For such stages, the ActiveJobs that submitted them are tracked in mapStageJobs. ⑨Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage.

  • 代码解读

⑤-在对stage进行划分时,shuffle map stage 包含前个shuffle之后的所有非shuffle操作,如map、filter等。
⑥ 对每个partition的output 信息进行维护

/**
   List of [[MapStatus]] for each partition. The index of the array
   is the map partition id,and each value in the array is the list of     
   possible [[MapStatus]]  for a partition(a single task might run 
   multiple times).
   ③⑧当前rdd的位置及状态信息及每个partiton会在哪个executor
  上执行并产生输出。该信息将用于DAG对task的调度.
**/
  private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)

[DAGScheduler]->submitMissingTasks(stage: Stage, jobId: Int)

                               ... 
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() 
                               ...
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
          val p = s.partitions(id)
          (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
                                       ...

⑦shuffleDep定义了整个shuffle的信息,每个stage的shuffleDep变量则标识该stage属于哪个shuffle应该执行怎么样的操作,
在提交执行stage时需要用到该信息。

class  ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false){
                                     ...
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容