在深度学习场景下,大部分任务都需要批量调度功能,也就是需要保证多个Pod同时地调度。它主要算法就是all or nothing的算法,保证整个资源要么可以调度,要么就不要调度,如果无法调度那么就排队,保证整个资源不会被饿死。这是一个比较常见的需求。例如原生 K8s 调度器调度 100 个 Pod 的作业时,即使调度了 99 个,最后 1 个因资源不足永远卡住,导致整个作业阻塞。
volcano就是实现gang-scheduling的批处理调度器,是Kubeflow生态中的一个组件。解决批处理场景特有的“要么全调要么不调”需求。其核心能力包括Gang Scheduling、公平调度 (Fair Sharing/Fair Queue)、队列管理 (Queue)、作业管理 (Job Lifecycle)、高级调度策略 (Binpack, DRF, Topology)。
Volcano 核心概念
Volcano 引入了几个新概念:
- Queue:用于管理和优先级排序任务,做资源隔离
- PodGroup:将一组 Pod 关联起来,实现 Gang Scheduling 等策略的基础,是作业调度的原子单位
- VolcanoJob:用户提交的作业单元
这些都是 K8s 里的自定义资源,也就是我们能够通过 kubectl 命令查到相应的资源对象,好比 Deployment、Service、Pod 这些。
PodGroup 是一组强关联的 Pod 集合,本质是CRD。核心字段spec.minMember是Gang Scheduling 的关键! 作业成功运行所需的最小 Pod 数量。调度器必须保证 >= minMember 个 Pod 同时被调度运行,作业才能推进。
Queue 是一个 PodGroup 队列,也是一个 CRD,用于分组作业、资源隔离、配额管理和优先级控制。
VolcanoJob 是 Volcano 中的一个核心概念,其实还是用的K8s Operator 模式,扩展了 Kubernetes 的 Job 资源。VolcanoJob 不仅包括了 Kubernetes Job 的所有特性,还加入了对批处理作业的额外支持,使得 Volcano 能够更好地适应高性能和大规模计算任务的需求。
VolcanoJob 根据配置去创建相应的 PodGroup 出来,逻辑上将属于同一个作业(VolcanoJob)的所有 Pod 聚合在一起。而 PodGroup 最终会被当做一个整体被 Volcano Scheduler 调度。在调度的过程中,Volcano 还用到了 Queue 来实现 PodGroup 的排队、优先级控制等逻辑。
Volcano 调度核心流程
Volcano 调度主要包含两个概念:
- Actions:enqueue、allocate、backfill 这些调度动作
- Plugins:Action 中执行的算法逻辑,就取决于注册进去的 plugins。
Volcano 的调度流程由一系列预定义的 Actions 按顺序执行构成。每个 Action 可以挂载多个 Plugin 来实现具体策略。Action和Plugin是Volcano 强大灵活性的基石。Volcano 的核心调度策略(Gang, Fairness (DRF), Priority, Binpacking, Task Topology)都是通过实现特定的 Plugin 来完成的。Action 提供了框架,Plugin 填充了策略。
典型的 Action 顺序是:
- Enqueue: 作业入队
- Reclaim: 资源回收
- Allocate: 资源分配
- Preempt: 抢占
- Backfill: 回填
源码分析
1. Action 实现
Volcano 在 pkg/scheduler 中放了调度器相关的代码,里面有一个 actions 目录。
/pkg/scheduler/actions/factory.go#L29
func init() {
framework.RegisterAction(reclaim.New())
framework.RegisterAction(allocate.New())
framework.RegisterAction(backfill.New())
framework.RegisterAction(preempt.New())
framework.RegisterAction(enqueue.New())
framework.RegisterAction(shuffle.New())
}
可以看到这里注册了6个 actions。RegisterAction 方法的实现也很简单,有一个 actionMap 来保存所有的 actions:
/pkg/scheduler/framework/plugins.go
var actionMap = map[string]Action{}
// RegisterAction register action
func RegisterAction(act Action) {
pluginMutex.Lock()
defer pluginMutex.Unlock()
actionMap[act.Name()] = act
}
2. 调度器逻辑
Run() 方法负责启动一个 Volcano 调度器:
/cmd/scheduler/main.go#L83
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
sched, err := scheduler.NewScheduler(config, opt)
run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
}
NewScheduler() 方法:
/pkg/scheduler/scheduler.go
// NewScheduler returns a Scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
//...
cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads, opt.IgnoredCSIProvisioners, opt.ResyncPeriod)
scheduler := &Scheduler{
schedulerConf: opt.SchedulerConf,
fileWatcher: watcher,
cache: cache,
schedulePeriod: opt.SchedulePeriod,
dumper: schedcache.Dumper{Cache: cache, RootDir: opt.CacheDumpFileDir},
}
return scheduler, nil
}
Scheduler 对象:
type Scheduler struct {
cache schedcache.Cache
schedulerConf string
fileWatcher filewatcher.FileWatcher
schedulePeriod time.Duration
once sync.Once
mutex sync.Mutex
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
metricsConf map[string]string
dumper schedcache.Dumper
}
Run() 方法:
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
pc.loadSchedulerConf()
go pc.watchSchedulerConf(stopCh)
// Start cache for policy.
pc.cache.SetMetricsConf(pc.metricsConf)
pc.cache.Run(stopCh)
klog.V(2).Infof("Scheduler completes Initialization and start to run")
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
if options.ServerOpts.EnableCacheDumper {
pc.dumper.ListenForSignal(stopCh)
}
go runSchedulerSocket()
}
这里就是 Scheduler 的执行逻辑了,runOnce 方法被周期性调用:
func (pc *Scheduler) runOnce() {
//...
// Load ConfigMap to check which action is enabled.
conf.EnabledActionMap = make(map[string]bool)
for _, action := range actions {
conf.EnabledActionMap[action.Name()] = true
}
ssn := framework.OpenSession(pc.cache, plugins, configurations)
defer func() {
framework.CloseSession(ssn)
metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
}()
for _, action := range actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
}
可以看到在 runOnce 中的2个关键步骤:
- ssn := framework.OpenSession(pc.cache, plugins, configurations)遍历 actions,
- 调用 action.Execute(ssn)
这里的 actions 集合是什么呢?OpenSession 拿到的 plugins 又是啥呢?
进一步跟代码可以找到如下默认配置:
var DefaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
所以默认配置下,执行的 actions 是 enqueue, allocate, backfill 三个。
3. actions 和 plugins 的调用逻辑
前面看framework.OpenSession() 函数打开了一个 Session。不过什么是 Session 呢?来具体看下 OpenSession() 函数的实现:
/pkg/scheduler/framework/framework.go
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
ssn := openSession(cache)
ssn.Tiers = tiers
ssn.Configurations = configurations
ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes)
ssn.PodLister = NewPodLister(ssn)
for _, tier := range tiers {
for _, plugin := range tier.Plugins {
if pb, found := GetPluginBuilder(plugin.Name); !found {
klog.Errorf("Failed to get plugin %s.", plugin.Name)
} else {
plugin := pb(plugin.Arguments)
ssn.plugins[plugin.Name()] = plugin
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}
}
}
return ssn
}
在 OpenSession() 函数中,plugins 被遍历,然后依次调用 plugin.OnSessionOpen(ssn) 方法。
一个个 plugins 注册具体的算法函数到 Session 里,然后 actions 顺序执行的过程中,到 Session 里去取相应的算法函数来执行。
Plugin Function | Purpose | Used By |
---|---|---|
JobOrderFn | Job priority comparison | allocate, preempt, reclaim |
TaskOrderFn | Task priority within jobs | allocate, preempt, reclaim |
PredicateFn | Node feasibility filtering | allocate, preempt, reclaim |
NodeOrderFn | Node scoring and ranking | allocate, backfill |
JobValidFn | Job admission validation | enqueue |
Preemptable | Victim task selection | preempt, reclaim |
Session作为调度周期上下文,承载了作业/队列状态快照Cache和中间结果Statement,是决策的基础,保证调度的事务性。
4. Action 分析:enqueue
Enqueue: 作业入队。检查 Queue 状态是否 Open、资源配额是否足够、优先级等,决定哪些 PodGroup(作业) 有资格进入本次调度周期。这是批处理资源队列管理的关键!
enqueue Action 的 Execute() 方法如下:
/pkg/scheduler/actions/enqueue/enqueue.go
func (enqueue *Action) Execute(ssn *framework.Session) {
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueSet := sets.NewString()
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {
//...
}
for {
//...
}
这里的 queues 是一个 Priority Queue,队列的实现用了 heap 包,实现了一个“最大堆”,也就是每次 Pop() 会拿到一个优先级最高的 item。执行了2个 for 循环。
先看第一个for循环:
for _, job := range ssn.Jobs {
if job.ScheduleStartTimestamp.IsZero() {
ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
Time: time.Now(),
}
}
if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else if !queueSet.Has(string(queue.UID)) {
klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
queueSet.Insert(string(queue.UID))
queues.Push(queue)
}
if job.IsPending() {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
}
这个 for 循环主要做2件事情,一个是遍历 jobs 的过程中判断用到了哪些 Queue(K8s 自定义资源对象),将这些 Queue 保存到 queueSet 和 queues 中;另外一个就是将处于 Pending 状态的 jobs 加入到 jobsMap 中。
再看第二个无限循环:
for {
if queues.Empty() {
break
}
queue := queues.Pop().(*api.QueueInfo)
// skip the Queue that has no pending job
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
job := jobs.Pop().(*api.JobInfo)
// 符合条件的 PodGroup 被标记为“可调度”,其 Pod 进入后续环节。
if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
ssn.JobEnqueued(job)
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
ssn.Jobs[job.UID] = job
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
这个循环的逻辑是消化队列里的 jobs。首先将全局队列按照优先级 Pop 一个高优队列出来,然后根据这个队列的 UID 找到本地 jobsMap 里对应的 jobs 队列,这又是一个优先级队列。最后从这个优先级队列中 Pop 一个高优 Job 出来,筛选有资格参与本次调度的 PodGroup (作业),符合条件的 PodGroup 被标记为“可调度”。
总的来说,enqueue 过程就是按照队列的优先级顺序,将队列中的 jobs 再按照优先级依次标记为 "Inqueue" 状态(job.PodGroup.Status.Phase = "Inqueue")。
5. Action 分析:allocate
Allocate: 资源分配 (核心中的核心)。尝试为待调度的 Pod 分配节点。
allocate.Execute() 方法的实现如下:
/pkg/scheduler/actions/allocate/allocate.go
func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")
alloc.parseArguments(ssn)
// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
// 2. pick a job named J from Q (using ssn.JobOrderFn)
// 3. pick a task T from J (using ssn.TaskOrderFn)
// 4. use predicateFn to filter out node that T can not be allocated on.
// 5. use ssn.NodeOrderFn to judge the best node and assign it to T
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}
alloc.session = ssn
alloc.pickUpQueuesAndJobs(queues, jobsMap)
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
alloc.allocateResources(queues, jobsMap)
}
调用了plugin注册到session的函数:QueueOrderFn等。
validJobFn, JobOrderFn封装在pickUpQueuesAndJobs方法。
TaskOrderFn,PredicateFn,NodeOrderFn封装在allocateResources方法。
代码经过多次重构有点晦涩,主流程其实挺简单,这个过程包括作业的predicate和prioritize。使用predicateFn预选,过滤掉不能分配作业的node;使用NodeOrderFn打分来找到最适合的分配节点:
pickUpQueuesAndJobs的逻辑是一个for循环,遍历 jobs,将其按照 queue 不同存到 jobsMap 中。
func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map[api.QueueID]*util.PriorityQueue) {
ssn := alloc.session
for _, job := range ssn.Jobs {
// If not config enqueue action, change Pending pg into Inqueue state to avoid blocking job scheduling.
//...
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}
//...
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
queues.Push(ssn.Queues[job.Queue])
}
jobsMap[job.Queue].Push(job)
}
}
allocateResources的逻辑主要是按照优先级依次给 tasks 寻找最合适的 node,找到后“预占”资源,于是按顺序逐步给所有的 tasks 都找到了最佳节点。
// prioritizeNodes selects the highest score node.
// 寻找最优节点
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
// Allocate idle resource to the task.
// 将前面找到的最佳节点相应资源分配给当前 task
if err := alloc.allocateResourcesForTask(stmt, task, bestNode, job); err == nil {
jobNewAllocatedHyperNode = getJobNewAllocatedHyperNode(ssn, bestNode.Name, job, jobNewAllocatedHyperNode)
}
分配最佳节点后,调用Commit方法真正绑定到 K8s API Server。调用 K8s Binding API (kubeClient.CoreV1().Pods().Bind()) 设置 Pod 的 .spec.nodeName。并且更新 PodGroup 状态。
stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
if stmt != nil {
stmt.Commit()
}
这里Action用Statement模式记录原子操作,保证调度决策的一致性。
6. Gang scheduling
Gang调度策略是volcano-scheduler的核心调度算法之一,它满足了调度过程中的“All or nothing”的调度需求,避免Pod的任意调度导致集群资源的浪费。具体算法是,观察Job下的Pod已调度数量是否满足了最小运行数量,当Job的最小运行数量得到满足时,为Job下的所有Pod执行调度动作,否则,不执行。
Gang调度策略是通过Gang plugin集成validation和ordering函数实现。session初始化的时候Gang plugin注册了如下几个关键函数。这些函数在上面Action:allocate中被调用。
Function | Purpose |
---|---|
JobOrderFn | Orders jobs for gang scheduling priority |
JobValidFn | Validates if job meets gang requirements |
JobReadyFn | Determines if job is ready to be scheduled |
PreemptableFn | Protects gang members from preemption |
validation函数:
pkg/scheduler/plugins/gang/gang.go
validJobFn := func(obj interface{}) *api.ValidateResult {
job, ok := obj.(*api.JobInfo)
if !ok {
return &api.ValidateResult{
Pass: false,
Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
}
}
if valid := job.CheckTaskValid(); !valid {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.NotEnoughPodsOfTaskReason,
Message: "Not enough valid pods of each task for gang-scheduling",
}
}
vtn := job.ValidTaskNum()
if vtn < job.MinAvailable {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.NotEnoughPodsReason,
Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
vtn, job.MinAvailable),
}
}
return nil
}
ordering函数:
jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
lReady := lv.IsReady()
rReady := rv.IsReady()
klog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t",
lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady)
if lReady && rReady {
return 0
}
if lReady {
return 1
}
if rReady {
return -1
}
return 0
}