[第七章] spark 资源调度算法深入剖析

前面几章我们重点讲述了spark的的原理,sparkContext的初使化,spark主备切换,master的注册等。其中我们分析源码时,不管是driver,还是application在注册Master时都,在最后都有这样的一个方法调用:

 ...
 schedule()

这个方法其实就是资源调度算法(这个方法太重要了,也是核心中的核心)
下面我们就来深入分析这个方法。

 private def schedule() {
  #当master不是alive时,直接reuturn
  #也就是说Standby是不参与资源调度的
    if (state != RecoveryState.ALIVE) { return }
//Random.shuffle作用就是把集合随机打乱
//取出workers中所有之前注册的worker,进行过滤,必须 状态 是Alive的worker
//把worker随机的打乱
 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(
_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size

为什么要调度driver,大家想一下什么时候会注册driver,并且导致driver被调度
其实只有在模式是yarn-cluster提交后,才会注册driver,因为standalone与yarn-client
都会在本地启动dirver,而不会来注册driver,就更不可能被master来调度
所以说下面的这个for只会运行在yarn-cluster模式下提交下。

 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      
      /**while中的条件,当还有活着的worker没有被遍历到,就继续遍历
       * 而且这个driver在这个worker中还没有启动,launched=false
       */
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        /**
         * 如果这个worker的空闲内存容量 大于等于driver所需的内存
         * 而且worker空间的CPU大于等于driver所需的CPU数量
         */
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
         //启动driver
          launchDriver(worker, driver)
          //把此driver从waitingDrivers中去掉
          waitingDrivers -= driver
          launched = true
        }
        //将指针指向下一个worker
        curPos = (curPos + 1) % numWorkersAlive
      }
    }

上面就是对Drvier的调度。
接着我们看对Application的资源调度
/**
* Application的调度机制(核心之核心 )
* 两种算法:一种是spreadOutApps(默认),另一种是非spreadOutApps
*
* 通过个算法,其实 会将每个application,要启动的executor都平均分配 到每个worker上
* 比如有20cpu core,有10个worker,那么实际会遍历两遍,每次循环,每个worker分配一个core
* 最后每个worker分配了两个core
*/

  if (spreadOutApps) {
          for (app <- waitingApps if app.coresLeft > 0) {
        //从workerk中,过滤出状态是ALIVE的
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        //创建一个空数组,存储了要分配的每个worker的cpu
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        //获取到底可以分配多少个cpu,取application所需的cpu数量与worker可用的cpu数量的最小值
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        //只要有还有要分配的cpu没有分配完就while
        while (toAssign > 0) {
          //如果可用的cpu大于已经分配的cpu数量,其实就是还有可用的cpu
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            //将要分配的cpu数量减一
            toAssign -= 1
            //给这个worker分配的cpu加1
            assigned(pos) += 1
          }
          //指定指向下一个worker
          pos = (pos + 1) % numUsable
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
       //给每个worker分配了application需要的cpu core后
        for (pos <- 0 until numUsable) {
          //判断这个worker已经分配了core
          if (assigned(pos) > 0) {
            //创建了ExecutorDesc对象,封装了executor的信息
            //将这个executor添加到application缓存区
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            //在worker上启动Executor
            launchExecutor(usableWorkers(pos), exec)
            //将application的状态为RUNNING
            app.state = ApplicationState.RUNNING
           }
        }
      }
    }

关于这种调度算法总结:

我们之前说了在spark-submit中指定了多少个executor,每个execuotr需要多少个cpu core 实际上基本这个机制,最后,executor的实际数量,每个executor需要的core,可能与配置不一样 因为这里我们是基于总的cpu来分配的,就是说比如,我们配置了需要三个executor来启动application, 每个executor需要三个core,那么就总需9个core,其实在这种算法中,如果我们有9个worker,会给每个 worker分配一个core,然后给每个worker启动一个executor.最后其实是启动了9个executor,每个 executor有一个core

第二种调度算法:

这种算法与上面的正好相反,每个application,都尽可能少的分配到worker上去, 比如总共有10个worker,每个有10个core application总共要分配20个core,那么只会分配到两个worker上,每个worker都占满了这10个core那么其它的application只能分配另外的worker上去了。 所以我们在spark-submit中配置了要10个executor,每个execuotr需要2个core 那么共需要20个core,但这种算法中,其实只会启动两个executor,每个executor有10个core

//遍历worker,并且状态是ALIVE,还有空闲的cpu的worker
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
       //遍历application,并且还有需要分配的core的applicztion
        for (app <- waitingApps if app.coresLeft > 0) {
          //判断当前这个worker可以被application使用
          if (canUse(app, worker)) {
            //取worker可用的cpu数量与application要分配的cpu数量的最小值
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            //如果小于0,说明没有core可分了
            if (coresToUse > 0) {
              val exec = app.addExecutor(worker, coresToUse)
                      //在worker上启动executor
              launchExecutor(worker, exec)
              //设置application的状态是
              app.state = ApplicationState.RUNNING
            }
          }

其中里面有一个非常重要的方法:

launchExecutor(worker, exec)
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    //将executor加入worker缓存
    worker.addExecutor(exec)
    //向worker的actor发送LaunchExecutor ,在worker中启动executor
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

这个方法里的
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
正是向worker发送这个消息(封闭了akka的消息通信) ,在worker端调用这个方法来启动Executor(关于worker如何启动Eexecutor与Application,我们在下一节会详细的剖析)

本章中每一个字(包括源码注解)都是作者敲出来的,你感觉有用,帮点击'喜欢'

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容