4.2 DAGScheduler - 形成Job和stage

1. 内部维护的重要结构

 // 后续会介绍, 这是维护的metric相关句柄 
 private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
 
  // 前文提到过的和job相关的结构
  private[scheduler] val nextJobId = new AtomicInteger(0)
  private[scheduler] def numTotalJobs: Int = nextJobId.get()
  private val nextStageId = new AtomicInteger(0)

  // 几个map映射表, 将job到stage, shuffle到stage, job到具体的执行过程也就是activeJob进行映射
  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
  private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

  // Stages we need to run whose parents aren't done
  private[scheduler] val waitingStages = new HashSet[Stage]

  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]

  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

  private[scheduler] val activeJobs = new HashSet[ActiveJob]

  /**
   * Contains the locations that each RDD's partitions are cached on.  This map's keys are RDD ids
   * and its values are arrays indexed by partition numbers. Each array value is the set of
   * locations where that RDD partition is cached.
   *
   * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
   */
  private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
  // every task. When we detect a node failing, we note the current epoch number and failed
  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
  //
  // TODO: Garbage collect information about failure epochs when we know there are no more
  //       stray messages to detect.
  private val failedEpoch = new HashMap[String, Long]

  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

  // A closure serializer that we reuse.
  // This is only safe because DAGScheduler runs in a single thread.
  private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

  /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
  private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)

  private val messageScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

   // 这里实现了从DAG的蓝图到taskScheduler具体任务的下刷. 后边会具体介绍
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  taskScheduler.setDAGScheduler(this)

对于EventProcessLoop后边会详细介绍, 这里可以认为这是一个产品经理把任务安排到jira上去执行的过程.从蓝图到执行

2. submitJob / runJob

SparkContext把一个任务放到EventPostLoop里, 去实际执行
这里外层调用时runJob, runJob内会实际执行submitJob方法来跑任务

SparkContxt的runJob是在RDD里面启动的, 每当RDD执行一个action, 比如rdd.aggragate 其内部代码就会调用sc.runJob()
这就是我们前文中说过的RDD的所有Action会触发DAGScheduler开始执行任务
这里有一个非常重要的调优点, 就是尽可能避免对RDD执行forEach操作. 有些Lamda写习惯了的程序员会直接用这个关键字, 这个关键字底层是一个Action

/**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @throws Exception when the job fails
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {

    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded =>
          // 正确处理
      case JobFailed(exception: Exception) =>
          // 错误处理
    }
  }
SubmitJob
  1. 检查被执行的rdd内部的parition是否是合法的
  2. 增加nextJobId
  3. 如果rdd里没有parition, 说明这job没在跑, 直接返回
  4. 有parition的话, 就把jobid, 对应的parition, rdd和具体的操作扔给EventProcessLoop, 并等待结果以JobWaiter的形式返回.

3. EventProcessJob

从上文我们可以看到从RDD操作, 到SparkContext submit job到最后运行.
任务需求, 可以理解为客户最终需求, 进入了一个叫EventProcessJob的结构里.
这是一个DAGScheduler内部维护的任务队列, 同样是以监听器模型来设计的.

这里熟悉Python Celery 或者 Apache Storm的朋友可以更容易理解这里DAG干的事情. 当最终任务进入这个管道后, DAG会根据最终需求和RDD里保存的依赖关系一步一步向前反推各种需要执行的任务. 每一步都放到这个管道里. 这个思路非常类似传统流处理系统的pipeline task.
类似客户最终要产品D, 然后D放到任务列表里, 然后在公司内部喊话: "抓紧把D的前置任务C给我放进去". 重复这个过程, 直到发现没有任何前置任务的"A", 或者恰好已经做过的"B". 无论如何, 经过一轮一轮的喊话, 所有需要执行的任务都被压入到上文提到的Stage/Job结构中. 后续从头开始执行就行了.

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
     // 当有一个JOB被提交进来后, 进行处理
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
     
    // 一个MapStage提交进来后进行处理
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
    //  用于特殊情况-失败情况 处理的
    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)
    // 用于特殊情况-失败情况 处理的
    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)
     // 用于特殊情况-失败情况 处理的
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    // 用于特殊情况-失败情况 处理的
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    // 把可以跑任务的Executor加入到任务规划里来
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
   
 // 干掉那些删库跑路的
    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)
     
    
    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)
    
    // 获取计算结果, 一般是为了获取当前任务依赖的上一级任务的中间结果
    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)
    
     //  任务失败
    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    
    // 错误场景恢复
    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

4. 从RDD的dependency到一个DAG图

  1. 从上文中我可以看到submitJob后从EventBus里启动JobSubmitted
  2. 从上文中我们知道所有的submitJob是从 RDD Action开始的, 所有的Action会导致一个stage生成. 可以看到一个方法newResultStage
private[scheduler] def handleJobSubmitted(
... ) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
    }
  1. 于是我们创建了一个stage, 赋予它stageId, 然后往这个stage里填东西
  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def newResultStage(
    ... ): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    // 生成一个stage, stageid是全局递增量
    stageIdToStage(id) = stage
    // 往这个stage里填job
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
  1. 继而跟踪代码, 我们发现程序试图在创建好这个stage后观察一下它依赖的所有资源是否到位了. 这里调用了getParentStages方法来实现新的stage.
    从这里我们也可以看到JOB内部会切分成多个stage
    这个在spark的web ui上也可以看到
    web ui
  /**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually mai  ntaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
      
        for (dep <- r.dependencies) {
          dep match {
        //  从RDD的依赖图中一层一层向上找, 所有shuffle操作会触发一个单独的stage的生成
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, firstJobId)
         
          //  非shuffle操作则直接被压入到任务列表里
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }
  1. 这个过程在DAGScheduler中还有几个实现的线路, 之后有机会再补充 [TODO]
    • RDD的shuffle操作会变成stage和stage之间的间隔, 每个shuffle操作都会触发一个新的stage的生成.
    • 而非shuffle操作, 则会被压入到工作队列里. 实际上就形成了这个stage以及和它关联的job
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容

  • spark-submit的时候如何引入外部jar包 在通过spark-submit提交任务时,可以通过添加配置参数...
    博弈史密斯阅读 2,723评论 1 14
  • from: http://www.linuxidc.com/Linux/2016-03/129506.htm 背景...
    生活的探路者阅读 7,775评论 0 2
  • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AM...
    大佛爱读书阅读 2,811评论 0 20
  • 基于spark1.6 创建完SparkContext,然后执行Action算子 当RDD执行Action算子时(形...
    scandly阅读 1,316评论 0 1
  • 三十年前常见面 三十年后未遇着 茫茫人海近咫尺 有幸今天面前过 倘若再过若干年 永远永远碰不着 隐隐约约似认识 反...
    保卫中华阅读 203评论 0 0