从源码看k8s gang scheduling

背景

k8s 最近的版本加入 workload aware scheduling 功能,可以支持gang调度
这里记录一下gang调度的实现

简单总结

通过gang plugin中的preenqueue plugin拦截配置了SchedulingGroup且podgroup policy为gang但是mincount不满足的的pod进入调度队列
通过gang plugin中的permit plugin拦截配置了SchedulingGroup且podgroup policy为gang但是mincount不满足的的pod进行bind,执行unreserve plugin等后续操作

源码

pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go中

const (
    permit超时时间
    ...
    permitTimeoutDuration = 5 * time.Minute
)



入调度队列前plugin
func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Status {
    没设置SchedulingGroup则不需要gang调度
    if pod.Spec.SchedulingGroup == nil {
        return nil
    }
    获取podGroup信息
    namespace := pod.Namespace
    schedulingGroup := pod.Spec.SchedulingGroup

    podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
    ...
    判断策略是否是gang调度
    policy := podGroup.Spec.SchedulingPolicy
    if policy.Gang == nil {
        return nil
    }
    ...
    获取podGroup状态
    podGroupState, err := pl.podGroupManager.PodGroupStates().Get(namespace, *schedulingGroup.PodGroupName)
    ...
    判断podGroup状态的所有pod数量是否满足minCount
    allPodsCount := podGroupState.AllPodsCount()
    if allPodsCount < int(policy.Gang.MinCount) {
        return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "waiting for minCount pods from a gang to appear in scheduling queue")
    }

    return nil
}


pod bind前plugin
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
    没设置SchedulingGroup则不需要gang调度
    if pod.Spec.SchedulingGroup == nil {
        return nil, 0
    }
    ...
    获取podGroup信息
    namespace := pod.Namespace
    schedulingGroup := pod.Spec.SchedulingGroup

    podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
    ...
    判断策略是否是gang调度
    policy := podGroup.Spec.SchedulingPolicy
    if policy.Gang == nil {
        return nil, 0
    }
    ...
    获取podGroup状态
    podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName)
    if err != nil {
        return fwk.AsStatus(err), 0
    }
    ...
    判断podGroup状态的已调度的pod数量是否满足minCount
    scheduledPodsCount := podGroupState.ScheduledPodsCount()
    if scheduledPodsCount < int(policy.Gang.MinCount) {
        return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), permitTimeoutDuration
    }

    允许这个pod并且通知其他同podgroup的pod继续处理
    assumedPods := podGroupState.AssumedPods()

    for podUID := range assumedPods {
        waitingPod := pl.handle.GetWaitingPod(podUID)
        if waitingPod != nil {
            waitingPod.Allow(Name)
        }
    }

}

pkg/scheduler/framework/runtime/waiting_pods_map.go中

创建waitingPod对象
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
    wp := &waitingPod{
        pod: pod,
        s: make(chan *fwk.Status, 1),
    }
    ...
    设置定时器,如果plugin超时,则reject
    for k, v := range pluginsMaxWaitTime {
        plugin, waitTime := k, v
        wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
            msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
                waitTime, plugin)
            wp.Reject(plugin, msg)
        })
    }

    return wp
}


permit结果为允许
func (w *waitingPod) Allow(pluginName string) {
    ...
    通知permit结果
    select {
    case w.s <- fwk.NewStatus(fwk.Success, ""):
    default:
    }
    ...
}

结果为拒绝
func (w *waitingPod) Reject(pluginName, msg string) bool {
    return w.stopWithStatus(fwk.Unschedulable, pluginName, msg)
}


携带状态停止
func (w *waitingPod) stopWithStatus(status fwk.Code, pluginName, msg string) bool {
    ...
    select {
    case w.s <- fwk.NewStatus(status, msg).WithPlugin(pluginName):
    default:
    }
    ...
}

pkg/scheduler/schedule_one.go中

运行bind循环
func (sched *Scheduler) runBindingCycle(
    ctx context.Context,
    state fwk.CycleState,
    schedFramework framework.Framework,
    scheduleResult ScheduleResult,
    assumedPodInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate) {
    运行bind循环
    status := sched.bindingCycle(bindingCycleCtx, state, schedFramework, scheduleResult, assumedPodInfo, start, podsToActivate)
    如果未成功,则处理bind循环错误
    if !status.IsSuccess() {
        sched.handleBindingCycleError(bindingCycleCtx, state, schedFramework, assumedPodInfo, start, scheduleResult, status)
        return
    }
}

bind循环
func (sched *Scheduler) bindingCycle(
    ctx context.Context,
    state fwk.CycleState,
    schedFramework framework.Framework,
    scheduleResult ScheduleResult,
    assumedPodInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate) *fwk.Status {
    ...
    等待permit
    if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
        ...
        return status
    }
    ...
}


func (sched *Scheduler) handleBindingCycleError(
    ctx context.Context,
    state fwk.CycleState,
    fwk framework.Framework,
    podInfo *framework.QueuedPodInfo,
    start time.Time,
    scheduleResult ScheduleResult,
    status *fwk.Status) {
    ...
    执行unreserve plugin
    if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
        utilruntime.HandleErrorWithContext(ctx, forgetErr, "ForgetPod failed")
    } else {
        如果是rejected
        if status.IsRejected() {
            当成pod delete事件,将除assume pod以外的pod移到active queue或者backoff queue
            defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
                return assumedPod.UID != pod.UID
            })
        } else {
            当成pod delete事件,将除所有移到active queue或者backoff queue
            sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, nil)
        }
    }
    ...
}
等待permit
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
    ...
    获取等待的pod
    waitingPod := f.waitingPods.get(pod.UID)
    ...
    获取permit状态
    s := <-waitingPod.s
    ...
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容