Spark Task

  • Task Locality

Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
 Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:
PROCESS_LOCAL - data is in the same JVM as the running code. This is the best locality possible
NODE_LOCAL - data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
NO_PREF - data is accessed equally quickly from anywhere and has no locality preference
RACK_LOCAL - data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
ANY - data is elsewhere on the network and not in the same rack
 Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options:
a) wait until a busy CPU frees up to start a task on data on the same server, or
b) immediately start a new task in a farther away place that requires moving data there
.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.


  • 1、获取partition位置信息

[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
                                  ...
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => 
            /**
             getPreferredLocs - 获取partition数据的位置信息,下文将分析不
              同情况下获取该信息的方式。
            **/
            (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
            val job = s.activeJob.getpartitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
                                   ...
[DAGScheduler]->submitMissingTasks->getPreferredLocsInternal
    private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
                                  ...
     /**
      cacheLocs 维护RDD的partitions 的 location信息,该信息是
      TaskLocation的实例。
      如果cacheLocs没有当前partition的location信息,则会执行如下逻辑:
      如果RDD的storageLevel为空返回nil,并填入cacheLocs,否则会通过    
      blocakManagerMaster来获取持有该partition信息的 blockManager 
     并实例化ExecutorCacheTaskLocation放入cacheLocs中。
      具体参看getCecheLocs方法。关于block及cache细节将在Storage章
      节具体分析。
      **/
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }

    /**
     RDD有个方法preferredLocations,该方法首先尝试获取
     partition的checkPoint信息,如果未进行过checkPoint则调用
     getPreferredLocations(split),不同的RDD有不同的实现。例如:
     HadoopRdd即通过Hadoop InputSplit 来获取当前partition的位置。
     如果当前RDD既未cache也不是输入RDD,则进行下一个逻辑。
    **/
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    /**
      当RDD未cache也不是输入RDD即无法通过preferredLocations来获取
      partition位置信息时则通过递归寻找父RDD对应的partition的位置信息
      ,该方式只对窄依赖有效。
    **/
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }
      case _ =>
    }
    Nil}
  • 2、Task构造

[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
                                    ...
 /**
  根据不同的stage类型构造不同类型的task。
  每个partition对应一个task且每个task都包含目标partition的location信
  息,最终所有tasks将作为taskSet进行提交。
  注:task的实际执行逻辑已经序列化到taskBinary中并broadcast到每个
executor上,此处构造的tasks只是加上了location信息,目的是通过driver端的TaskScheduler进行调度,并不会将该taskSet进行序列化和广播。
 **/
 val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
                                   ...
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
          }
        case stage: ResultStage =>
           val job = stage.activeJob.get
            partitionsToCompute.map { id =>
                                   ...
              new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, 
              part, locs, id, properties, stage.latestInfo.taskMetrics)
          }
      }
    }
                                     ...
if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      /**
          构造TaskSet进行提交,大部分情况下使用的是
          TaskSchedulerImpl,DAG通过调用其实例进行task提交,而  
          TaskSchedulerImpl通过TaskSetManager的实例
          对taskSet进行管理(
          TaskSetmanager在实例化过程中会对
          task进行executor分配,有且仅有两种分配方式根据上述      
          preferedLocation类型而定:
          ExecutorCacheTaskLocation即cache在executor上的RDD:
          HDFSCacheTaskLocation:及hdfs输入数据或者checkpoint数据
          详见[TaskSetManager]->addPendingTask
          )。细节在'spark调度'章节    
          会描述。
       **/
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
                                      ...
  • 2、Task提交

[TaskSchedulerImpl]->def submitTasks(taskSet: TaskSet)

 override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " +
           tasks.length + " tasks")
    this.synchronized {
      /**Schedules the tasks within a single TaskSet in the  
         TaskSchedulerImpl. This class keeps track of each task, retries 
        tasks if they fail (up to a limited number of times), and handles 
        locality-aware scheduling for this TaskSet via delay scheduling.
        The main interfaces to it are resourceOffer, which asks the 
        TaskSet whether it wants to run a task on one node,and 
        statusUpdate, which tells it that one of its tasks changed state
        (e.g. finished).
      **/
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
                                    ...
      /**
          有两种实现对应不同的task调度算法(与OS中的调度一样):
          1、FIFOSchedulableBuilder
          2、FairSchedulableBuilder
          schedulableBuilder中持有Pool用于管理taskmanager,并根据不
          同的调度算法返回不同顺序的taskmanager。
          同时该pool的checkSpeculatableTasks方法用于对开启了speculate
          的job进行task的重复执行。
          此处实际操作是将taskManager放入pool中,进行异步调度。
       **/
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +"and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    /**
      on yarn的情况下此处的backend为CoarseGrainedSchedulerBackend  
      的实例,该backend持有当前job执行状态下所有executor信息,并可
      对其进行管理,该backend启动在driver端。此处调用最终会调用
      backend.makeOffers进行资源申请并触发task调度。
     **/
    backend.reviveOffers()
  }
  • 3、Task 资源申请及调度

[CoarseGrainedSchedulerBackend]->private def makeOffers()**
    /**
      该方法会在每次系统资源发生变化时被调用,例如executor向backend
      进行注册时,task完成时。executor向backend发送消息,backend在
      处理逻辑最后一步触发一次task调度逻辑。spark中的所有异步调度
      都是类似的处理方法,例如standalone模式下对executor的调度、 
      waitingStage的调度等。
     **/
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      /**
        获取系统当前每个executor的可用cpu资源以case class workerOffer
        返回。在后面对task进行分配时以此为依据执行locality逻辑。
        **/
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      /**
        该方法会序列化TaskDescription并发送到相应的executor上进行
        逻辑执行。
        scheduler.resourceOffers(workOffers):进行具体的task分配
        逻辑。
      **/
      launchTasks(scheduler.resourceOffers(workOffers))
    }
[CoarseGrainedSchedulerBackend]->private def makeOffers()
 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] 
= synchronized {
    var newExecAvail = false
    /**
    如果当前系统executors中有新加入的,这里
    executorAdded(o.executorId, o.host)最终会调用DAGScheduler中
    的submitWaitingStages()进行一次stage的提交,正如上限task
     的调度一样,当系统资源发生变化时即触发一次调度逻辑。
     **/
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    /**
        Build a list of tasks to assign to each worker.
        根据每个executor可用core的数量进行task分配,每个core对应一个
        task。  
    **/
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    /**
      此处如上所述根据不同的调度算法得到不同排序的task,也即
      最终的执行顺序。
      FIFO或者FAIR。
     **/
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
       /**
          当启动了新的executor,这里会重新计算task的locality。
          TaskSetManager.recomputeLocality         
       **/
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }
    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • 小时候,你告诉我车轮是圆的 我用泥巴捏成你所说的样子 但我觉得那更像一朵太阳花 长大后,我知道车轮是一个圆柱体 就...
    晓朔一一阅读 401评论 6 5
  • 今年的胚胎发育学,老师讲课时说,他的目的是节课时,讲的让我们不知道自己的性别,才算成功。期末考试的最后一道论述大题...
    黄杨姑娘阅读 229评论 0 0
  • 亲爱的们,大家好!我是咚咚,就是那个爱学习爱折腾思维导图的咚咚! 我们的宗旨:手把手,一步步教会你使用思维导图优化...
    咚咚老师阅读 1,664评论 1 10