背景
k8s 最近的版本加入 workload aware scheduling 功能,可以支持gang调度
这里记录一下gang调度的实现
简单总结
通过gang plugin中的preenqueue plugin拦截配置了SchedulingGroup且podgroup policy为gang但是mincount不满足的的pod进入调度队列
通过gang plugin中的permit plugin拦截配置了SchedulingGroup且podgroup policy为gang但是mincount不满足的的pod进行bind,执行unreserve plugin等后续操作
源码
pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go中
const (
permit超时时间
...
permitTimeoutDuration = 5 * time.Minute
)
入调度队列前plugin
func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Status {
没设置SchedulingGroup则不需要gang调度
if pod.Spec.SchedulingGroup == nil {
return nil
}
获取podGroup信息
namespace := pod.Namespace
schedulingGroup := pod.Spec.SchedulingGroup
podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
...
判断策略是否是gang调度
policy := podGroup.Spec.SchedulingPolicy
if policy.Gang == nil {
return nil
}
...
获取podGroup状态
podGroupState, err := pl.podGroupManager.PodGroupStates().Get(namespace, *schedulingGroup.PodGroupName)
...
判断podGroup状态的所有pod数量是否满足minCount
allPodsCount := podGroupState.AllPodsCount()
if allPodsCount < int(policy.Gang.MinCount) {
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "waiting for minCount pods from a gang to appear in scheduling queue")
}
return nil
}
pod bind前plugin
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
没设置SchedulingGroup则不需要gang调度
if pod.Spec.SchedulingGroup == nil {
return nil, 0
}
...
获取podGroup信息
namespace := pod.Namespace
schedulingGroup := pod.Spec.SchedulingGroup
podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
...
判断策略是否是gang调度
policy := podGroup.Spec.SchedulingPolicy
if policy.Gang == nil {
return nil, 0
}
...
获取podGroup状态
podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName)
if err != nil {
return fwk.AsStatus(err), 0
}
...
判断podGroup状态的已调度的pod数量是否满足minCount
scheduledPodsCount := podGroupState.ScheduledPodsCount()
if scheduledPodsCount < int(policy.Gang.MinCount) {
return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), permitTimeoutDuration
}
允许这个pod并且通知其他同podgroup的pod继续处理
assumedPods := podGroupState.AssumedPods()
for podUID := range assumedPods {
waitingPod := pl.handle.GetWaitingPod(podUID)
if waitingPod != nil {
waitingPod.Allow(Name)
}
}
}
pkg/scheduler/framework/runtime/waiting_pods_map.go中
创建waitingPod对象
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
s: make(chan *fwk.Status, 1),
}
...
设置定时器,如果plugin超时,则reject
for k, v := range pluginsMaxWaitTime {
plugin, waitTime := k, v
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
waitTime, plugin)
wp.Reject(plugin, msg)
})
}
return wp
}
permit结果为允许
func (w *waitingPod) Allow(pluginName string) {
...
通知permit结果
select {
case w.s <- fwk.NewStatus(fwk.Success, ""):
default:
}
...
}
结果为拒绝
func (w *waitingPod) Reject(pluginName, msg string) bool {
return w.stopWithStatus(fwk.Unschedulable, pluginName, msg)
}
携带状态停止
func (w *waitingPod) stopWithStatus(status fwk.Code, pluginName, msg string) bool {
...
select {
case w.s <- fwk.NewStatus(status, msg).WithPlugin(pluginName):
default:
}
...
}
pkg/scheduler/schedule_one.go中
运行bind循环
func (sched *Scheduler) runBindingCycle(
ctx context.Context,
state fwk.CycleState,
schedFramework framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) {
运行bind循环
status := sched.bindingCycle(bindingCycleCtx, state, schedFramework, scheduleResult, assumedPodInfo, start, podsToActivate)
如果未成功,则处理bind循环错误
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, schedFramework, assumedPodInfo, start, scheduleResult, status)
return
}
}
bind循环
func (sched *Scheduler) bindingCycle(
ctx context.Context,
state fwk.CycleState,
schedFramework framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *fwk.Status {
...
等待permit
if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
...
return status
}
...
}
func (sched *Scheduler) handleBindingCycleError(
ctx context.Context,
state fwk.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
scheduleResult ScheduleResult,
status *fwk.Status) {
...
执行unreserve plugin
if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
utilruntime.HandleErrorWithContext(ctx, forgetErr, "ForgetPod failed")
} else {
如果是rejected
if status.IsRejected() {
当成pod delete事件,将除assume pod以外的pod移到active queue或者backoff queue
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
当成pod delete事件,将除所有移到active queue或者backoff queue
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, nil)
}
}
...
}
等待permit
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
...
获取等待的pod
waitingPod := f.waitingPods.get(pod.UID)
...
获取permit状态
s := <-waitingPod.s
...
}