批处理调度器volcano

在深度学习场景下,大部分任务都需要批量调度功能,也就是需要保证多个Pod同时地调度。它主要算法就是all or nothing的算法,保证整个资源要么可以调度,要么就不要调度,如果无法调度那么就排队,保证整个资源不会被饿死。这是一个比较常见的需求。例如原生 K8s 调度器调度 100 个 Pod 的作业时,即使调度了 99 个,最后 1 个因资源不足永远卡住,导致整个作业阻塞。

volcano就是实现gang-scheduling的批处理调度器,是Kubeflow生态中的一个组件。解决批处理场景特有的“要么全调要么不调”需求。其核心能力包括Gang Scheduling、公平调度 (Fair Sharing/Fair Queue)、队列管理 (Queue)、作业管理 (Job Lifecycle)、高级调度策略 (Binpack, DRF, Topology)。

Volcano 核心概念

Volcano 引入了几个新概念:

  • Queue:用于管理和优先级排序任务,做资源隔离
  • PodGroup:将一组 Pod 关联起来,实现 Gang Scheduling 等策略的基础,是作业调度的原子单位
  • VolcanoJob:用户提交的作业单元

这些都是 K8s 里的自定义资源,也就是我们能够通过 kubectl 命令查到相应的资源对象,好比 Deployment、Service、Pod 这些。

PodGroup 是一组强关联的 Pod 集合,本质是CRD。核心字段spec.minMember是Gang Scheduling 的关键! 作业成功运行所需的最小 Pod 数量。调度器必须保证 >= minMember 个 Pod 同时被调度运行,作业才能推进。

Queue 是一个 PodGroup 队列,也是一个 CRD,用于分组作业、资源隔离、配额管理和优先级控制。

VolcanoJob 是 Volcano 中的一个核心概念,其实还是用的K8s Operator 模式,扩展了 Kubernetes 的 Job 资源。VolcanoJob 不仅包括了 Kubernetes Job 的所有特性,还加入了对批处理作业的额外支持,使得 Volcano 能够更好地适应高性能和大规模计算任务的需求。

VolcanoJob 根据配置去创建相应的 PodGroup 出来,逻辑上将属于同一个作业(VolcanoJob)的所有 Pod 聚合在一起。而 PodGroup 最终会被当做一个整体被 Volcano Scheduler 调度。在调度的过程中,Volcano 还用到了 Queue 来实现 PodGroup 的排队、优先级控制等逻辑。

Volcano 调度核心流程

Volcano 调度主要包含两个概念:

  • Actions:enqueue、allocate、backfill 这些调度动作
  • Plugins:Action 中执行的算法逻辑,就取决于注册进去的 plugins。

Volcano 的调度流程由一系列预定义的 Actions 按顺序执行构成。每个 Action 可以挂载多个 Plugin 来实现具体策略。Action和Plugin是Volcano 强大灵活性的基石。Volcano 的核心调度策略(Gang, Fairness (DRF), Priority, Binpacking, Task Topology)都是通过实现特定的 Plugin 来完成的。Action 提供了框架,Plugin 填充了策略。
典型的 Action 顺序是:

  • Enqueue: 作业入队
  • Reclaim: 资源回收
  • Allocate: 资源分配
  • Preempt: 抢占
  • Backfill: 回填

源码分析

1. Action 实现

Volcano 在 pkg/scheduler 中放了调度器相关的代码,里面有一个 actions 目录。

/pkg/scheduler/actions/factory.go#L29
func init() {
    framework.RegisterAction(reclaim.New())
    framework.RegisterAction(allocate.New())
    framework.RegisterAction(backfill.New())
    framework.RegisterAction(preempt.New())
    framework.RegisterAction(enqueue.New())
    framework.RegisterAction(shuffle.New())
}

可以看到这里注册了6个 actions。RegisterAction 方法的实现也很简单,有一个 actionMap 来保存所有的 actions:

/pkg/scheduler/framework/plugins.go
var actionMap = map[string]Action{}

// RegisterAction register action
func RegisterAction(act Action) {
    pluginMutex.Lock()
    defer pluginMutex.Unlock()

    actionMap[act.Name()] = act
}

2. 调度器逻辑

Run() 方法负责启动一个 Volcano 调度器:

/cmd/scheduler/main.go#L83
if err := app.Run(s); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

sched, err := scheduler.NewScheduler(config, opt)
run := func(ctx context.Context) {
        sched.Run(ctx.Done())
        <-ctx.Done()
    }

NewScheduler() 方法:

/pkg/scheduler/scheduler.go
// NewScheduler returns a Scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
    //...
    cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads, opt.IgnoredCSIProvisioners, opt.ResyncPeriod)
    scheduler := &Scheduler{
        schedulerConf:  opt.SchedulerConf,
        fileWatcher:    watcher,
        cache:          cache,
        schedulePeriod: opt.SchedulePeriod,
        dumper:         schedcache.Dumper{Cache: cache, RootDir: opt.CacheDumpFileDir},
    }

    return scheduler, nil
}

Scheduler 对象:

type Scheduler struct {
    cache          schedcache.Cache
    schedulerConf  string
    fileWatcher    filewatcher.FileWatcher
    schedulePeriod time.Duration
    once           sync.Once

    mutex          sync.Mutex
    actions        []framework.Action
    plugins        []conf.Tier
    configurations []conf.Configuration
    metricsConf    map[string]string
    dumper         schedcache.Dumper
}

Run() 方法:

func (pc *Scheduler) Run(stopCh <-chan struct{}) {
    pc.loadSchedulerConf()
    go pc.watchSchedulerConf(stopCh)
    // Start cache for policy.
    pc.cache.SetMetricsConf(pc.metricsConf)
    pc.cache.Run(stopCh)
    klog.V(2).Infof("Scheduler completes Initialization and start to run")
    go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
    if options.ServerOpts.EnableCacheDumper {
        pc.dumper.ListenForSignal(stopCh)
    }
    go runSchedulerSocket()
}

这里就是 Scheduler 的执行逻辑了,runOnce 方法被周期性调用:

func (pc *Scheduler) runOnce() {
    //...
    // Load ConfigMap to check which action is enabled.
    conf.EnabledActionMap = make(map[string]bool)
    for _, action := range actions {
        conf.EnabledActionMap[action.Name()] = true
    }

    ssn := framework.OpenSession(pc.cache, plugins, configurations)
    defer func() {
        framework.CloseSession(ssn)
        metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
    }()

    for _, action := range actions {
        actionStartTime := time.Now()
        action.Execute(ssn)
        metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
    }
}

可以看到在 runOnce 中的2个关键步骤:

  1. ssn := framework.OpenSession(pc.cache, plugins, configurations)遍历 actions,
  2. 调用 action.Execute(ssn)
    这里的 actions 集合是什么呢?OpenSession 拿到的 plugins 又是啥呢?
    进一步跟代码可以找到如下默认配置:
var DefaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
  - name: priority
  - name: gang
  - name: conformance
- plugins:
  - name: overcommit
  - name: drf
  - name: predicates
  - name: proportion
  - name: nodeorder

所以默认配置下,执行的 actions 是 enqueue, allocate, backfill 三个。

3. actions 和 plugins 的调用逻辑

前面看framework.OpenSession() 函数打开了一个 Session。不过什么是 Session 呢?来具体看下 OpenSession() 函数的实现:

/pkg/scheduler/framework/framework.go
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
    ssn := openSession(cache)
    ssn.Tiers = tiers
    ssn.Configurations = configurations
    ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes)
    ssn.PodLister = NewPodLister(ssn)

    for _, tier := range tiers {
        for _, plugin := range tier.Plugins {
            if pb, found := GetPluginBuilder(plugin.Name); !found {
                klog.Errorf("Failed to get plugin %s.", plugin.Name)
            } else {
                plugin := pb(plugin.Arguments)
                ssn.plugins[plugin.Name()] = plugin
                onSessionOpenStart := time.Now()
                plugin.OnSessionOpen(ssn)
                metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
            }
        }
    }
    return ssn
}

在 OpenSession() 函数中,plugins 被遍历,然后依次调用 plugin.OnSessionOpen(ssn) 方法。
一个个 plugins 注册具体的算法函数到 Session 里,然后 actions 顺序执行的过程中,到 Session 里去取相应的算法函数来执行。

Plugin Function Purpose Used By
JobOrderFn Job priority comparison allocate, preempt, reclaim
TaskOrderFn Task priority within jobs allocate, preempt, reclaim
PredicateFn Node feasibility filtering allocate, preempt, reclaim
NodeOrderFn Node scoring and ranking allocate, backfill
JobValidFn Job admission validation enqueue
Preemptable Victim task selection preempt, reclaim
Volcano scheduler工作流

Session作为调度周期上下文,承载了作业/队列状态快照Cache和中间结果Statement,是决策的基础,保证调度的事务性。

4. Action 分析:enqueue

Enqueue: 作业入队。检查 Queue 状态是否 Open、资源配额是否足够、优先级等,决定哪些 PodGroup(作业) 有资格进入本次调度周期。这是批处理资源队列管理的关键!

enqueue Action 的 Execute() 方法如下:

/pkg/scheduler/actions/enqueue/enqueue.go
func (enqueue *Action) Execute(ssn *framework.Session) {

    queues := util.NewPriorityQueue(ssn.QueueOrderFn)
    queueSet := sets.NewString()
    jobsMap := map[api.QueueID]*util.PriorityQueue{}

    for _, job := range ssn.Jobs {
        //...
        }

    for {
        //...
    }

这里的 queues 是一个 Priority Queue,队列的实现用了 heap 包,实现了一个“最大堆”,也就是每次 Pop() 会拿到一个优先级最高的 item。执行了2个 for 循环。
先看第一个for循环:

    for _, job := range ssn.Jobs {
        if job.ScheduleStartTimestamp.IsZero() {
            ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
                Time: time.Now(),
            }
        }
        if queue, found := ssn.Queues[job.Queue]; !found {
            klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
                job.Queue, job.Namespace, job.Name)
            continue
        } else if !queueSet.Has(string(queue.UID)) {
            klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
                queue.Name, job.Namespace, job.Name)

            queueSet.Insert(string(queue.UID))
            queues.Push(queue)
        }

        if job.IsPending() {
            if _, found := jobsMap[job.Queue]; !found {
                jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
            }
            klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
            jobsMap[job.Queue].Push(job)
        }
    }

这个 for 循环主要做2件事情,一个是遍历 jobs 的过程中判断用到了哪些 Queue(K8s 自定义资源对象),将这些 Queue 保存到 queueSet 和 queues 中;另外一个就是将处于 Pending 状态的 jobs 加入到 jobsMap 中。
再看第二个无限循环:

    for {
        if queues.Empty() {
            break
        }

        queue := queues.Pop().(*api.QueueInfo)

        // skip the Queue that has no pending job
        jobs, found := jobsMap[queue.UID]
        if !found || jobs.Empty() {
            continue
        }
        job := jobs.Pop().(*api.JobInfo)

        // 符合条件的 PodGroup 被标记为“可调度”,其 Pod 进入后续环节。
        if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
            ssn.JobEnqueued(job)
            job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
            ssn.Jobs[job.UID] = job
        }

        // Added Queue back until no job in Queue.
        queues.Push(queue)
    }

这个循环的逻辑是消化队列里的 jobs。首先将全局队列按照优先级 Pop 一个高优队列出来,然后根据这个队列的 UID 找到本地 jobsMap 里对应的 jobs 队列,这又是一个优先级队列。最后从这个优先级队列中 Pop 一个高优 Job 出来,筛选有资格参与本次调度的 PodGroup (作业),符合条件的 PodGroup 被标记为“可调度”。

总的来说,enqueue 过程就是按照队列的优先级顺序,将队列中的 jobs 再按照优先级依次标记为 "Inqueue" 状态(job.PodGroup.Status.Phase = "Inqueue")。

5. Action 分析:allocate

Allocate: 资源分配 (核心中的核心)。尝试为待调度的 Pod 分配节点。

allocate.Execute() 方法的实现如下:

/pkg/scheduler/actions/allocate/allocate.go
func (alloc *Action) Execute(ssn *framework.Session) {
    klog.V(5).Infof("Enter Allocate ...")
    defer klog.V(5).Infof("Leaving Allocate ...")

    alloc.parseArguments(ssn)

    // the allocation for pod may have many stages
    // 1. pick a queue named Q (using ssn.QueueOrderFn)
    // 2. pick a job named J from Q (using ssn.JobOrderFn)
    // 3. pick a task T from J (using ssn.TaskOrderFn)
    // 4. use predicateFn to filter out node that T can not be allocated on.
    // 5. use ssn.NodeOrderFn to judge the best node and assign it to T

    // queues sort queues by QueueOrderFn.
    queues := util.NewPriorityQueue(ssn.QueueOrderFn)
    // jobsMap is used to find job with the highest priority in given queue.
    jobsMap := map[api.QueueID]*util.PriorityQueue{}

    alloc.session = ssn
    alloc.pickUpQueuesAndJobs(queues, jobsMap)
    klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
    alloc.allocateResources(queues, jobsMap)
}

调用了plugin注册到session的函数:QueueOrderFn等。
validJobFn, JobOrderFn封装在pickUpQueuesAndJobs方法。
TaskOrderFn,PredicateFn,NodeOrderFn封装在allocateResources方法。
代码经过多次重构有点晦涩,主流程其实挺简单,这个过程包括作业的predicate和prioritize。使用predicateFn预选,过滤掉不能分配作业的node;使用NodeOrderFn打分来找到最适合的分配节点:


allocate流程

pickUpQueuesAndJobs的逻辑是一个for循环,遍历 jobs,将其按照 queue 不同存到 jobsMap 中。

func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map[api.QueueID]*util.PriorityQueue) {
    ssn := alloc.session
    for _, job := range ssn.Jobs {
        // If not config enqueue action, change Pending pg into Inqueue state to avoid blocking job scheduling.
        //...
        if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
            klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
            continue
        }
        //...
        if _, found := jobsMap[job.Queue]; !found {
            jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
            queues.Push(ssn.Queues[job.Queue])
        }
        jobsMap[job.Queue].Push(job)
    }
}

allocateResources的逻辑主要是按照优先级依次给 tasks 寻找最合适的 node,找到后“预占”资源,于是按顺序逐步给所有的 tasks 都找到了最佳节点。

// prioritizeNodes selects the highest score node.
// 寻找最优节点
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
// Allocate idle resource to the task.
// 将前面找到的最佳节点相应资源分配给当前 task
if err := alloc.allocateResourcesForTask(stmt, task, bestNode, job); err == nil {
    jobNewAllocatedHyperNode = getJobNewAllocatedHyperNode(ssn, bestNode.Name, job, jobNewAllocatedHyperNode)
}

分配最佳节点后,调用Commit方法真正绑定到 K8s API Server。调用 K8s Binding API (kubeClient.CoreV1().Pods().Bind()) 设置 Pod 的 .spec.nodeName。并且更新 PodGroup 状态。

stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
if stmt != nil {
    stmt.Commit()
}

这里Action用Statement模式记录原子操作,保证调度决策的一致性。


Statement模式实现的调度事务

6. Gang scheduling

Gang调度策略是volcano-scheduler的核心调度算法之一,它满足了调度过程中的“All or nothing”的调度需求,避免Pod的任意调度导致集群资源的浪费。具体算法是,观察Job下的Pod已调度数量是否满足了最小运行数量,当Job的最小运行数量得到满足时,为Job下的所有Pod执行调度动作,否则,不执行。

Gang scheduling flow

Gang调度策略是通过Gang plugin集成validation和ordering函数实现。session初始化的时候Gang plugin注册了如下几个关键函数。这些函数在上面Action:allocate中被调用。

Function Purpose
JobOrderFn Orders jobs for gang scheduling priority
JobValidFn Validates if job meets gang requirements
JobReadyFn Determines if job is ready to be scheduled
PreemptableFn Protects gang members from preemption

validation函数:

pkg/scheduler/plugins/gang/gang.go
validJobFn := func(obj interface{}) *api.ValidateResult {
        job, ok := obj.(*api.JobInfo)
        if !ok {
            return &api.ValidateResult{
                Pass:    false,
                Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
            }
        }

        if valid := job.CheckTaskValid(); !valid {
            return &api.ValidateResult{
                Pass:    false,
                Reason:  v1beta1.NotEnoughPodsOfTaskReason,
                Message: "Not enough valid pods of each task for gang-scheduling",
            }
        }

        vtn := job.ValidTaskNum()
        if vtn < job.MinAvailable {
            return &api.ValidateResult{
                Pass:   false,
                Reason: v1beta1.NotEnoughPodsReason,
                Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
                    vtn, job.MinAvailable),
            }
        }
        return nil
    }

ordering函数:

jobOrderFn := func(l, r interface{}) int {
        lv := l.(*api.JobInfo)
        rv := r.(*api.JobInfo)

        lReady := lv.IsReady()
        rReady := rv.IsReady()

        klog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t",
            lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady)

        if lReady && rReady {
            return 0
        }

        if lReady {
            return 1
        }

        if rReady {
            return -1
        }

        return 0
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容