调度器中assumedPod详解

type schedulerCache struct {
    stop   <-chan struct{}
    ttl    time.Duration
    period time.Duration

    // This mutex guards all fields within this cache struct.
    mu sync.RWMutex
    // a set of assumed pod keys.
    // The key could further be used to get an entry in podStates.
    assumedPods map[string]bool
    // a map from pod key to podState.
    podStates map[string]*podState
    nodes     map[string]*nodeInfoListItem
    // headNode points to the most recently updated NodeInfo in "nodes". It is the
    // head of the linked list.
    headNode *nodeInfoListItem
    nodeTree *nodeTree
    // A map from image name to its imageState.
    imageStates map[string]*imageState
}

调度器缓存中的assumedPods就是所有假定pod的集合

假定pod的产生和作用

调度完成后,pod会加入assumedPods集合

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    ...
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // Optimistically assume that the binding will succeed and send it to apiserver
    // in the background.
    // If the binding fails, scheduler will release resources allocated to assumed pod
    // immediately.
    assumed.Spec.NodeName = host

    if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
        klog.Errorf("scheduler cache AssumePod failed: %v", err)
        return err
    }
    // if "assumed" is a nominated pod, we should remove it from internal cache
    if sched.SchedulingQueue != nil {
        sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    }

    return nil
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()
    if _, ok := cache.podStates[key]; ok {
        return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
    }

    cache.addPod(pod)
    ps := &podState{
        pod: pod,
    }
    cache.podStates[key] = ps
    cache.assumedPods[key] = true
    return nil
}

会把此pod加到node缓存的相应node中,占用节点资源

假定pod的自动清理

假定pod的加入时机是"优选"结束, bind之前,在bind结束后,给podState加上deadline超时等待绑定成功的事件

// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.RLock()
    defer cache.mu.RUnlock()

    klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
    currState, ok := cache.podStates[key]
    if ok && cache.assumedPods[key] {
        dl := now.Add(cache.ttl)
        currState.bindingFinished = true
        currState.deadline = &dl
    }
    return nil
}

有一个无限循环的携程在执行以下函数,对超时的假定pod进行warn日志打印和移除,同时解除节点资源的占用

// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
    cache.mu.Lock()
    defer cache.mu.Unlock()
    defer cache.updateMetrics()

    // The size of assumedPods should be small
    for key := range cache.assumedPods {
        ps, ok := cache.podStates[key]
        if !ok {
            klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
        }
        if !ps.bindingFinished {
            klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
                ps.pod.Namespace, ps.pod.Name)
            continue
        }
        if now.After(*ps.deadline) {
            klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
            if err := cache.expirePod(key, ps); err != nil {
                klog.Errorf("ExpirePod failed for %s: %v", key, err)
            }
        }
    }
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
    if err := cache.removePod(ps.pod); err != nil {
        return err
    }
    delete(cache.assumedPods, key)
    delete(cache.podStates, key)
    return nil
}

假定pod正常移除(pod转正)

bind操作给pod的Spec.NodeName赋值,触发pod更新事件

在启动调度器时,设置了几个事件监听,其中有一个已调度pod的事件监听

// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func addAllEventHandlers(
    sched *Scheduler,
    informerFactory informers.SharedInformerFactory,
    podInformer coreinformers.PodInformer,
) {
    // scheduled pod cache
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToCache,
                UpdateFunc: sched.updatePodInCache,
                DeleteFunc: sched.deletePodFromCache,
            },
        },
    )
    ...
// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *v1.Pod) bool {
    return len(pod.Spec.NodeName) != 0
}

当pod的更新事件是pod.Spec.NodeName从无到有时,触发AddFunc,将pod加到调度缓存

func (sched *Scheduler) addPodToCache(obj interface{}) {
    pod, ok := obj.(*v1.Pod)
    if !ok {
        klog.Errorf("cannot convert to *v1.Pod: %v", obj)
        return
    }
    klog.V(3).Infof("add event for scheduled pod %s/%s ", pod.Namespace, pod.Name)

    if err := sched.SchedulerCache.AddPod(pod); err != nil {
        klog.Errorf("scheduler cache AddPod failed: %v", err)
    }

    sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()

    currState, ok := cache.podStates[key]
    switch {
    case ok && cache.assumedPods[key]:
        if currState.pod.Spec.NodeName != pod.Spec.NodeName {
            // The pod was added to a different node than it was assumed to.
            klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
            // Clean this up.
            if err = cache.removePod(currState.pod); err != nil {
                klog.Errorf("removing pod error: %v", err)
            }
            cache.addPod(pod)
        }
        delete(cache.assumedPods, key)
        cache.podStates[key].deadline = nil
        cache.podStates[key].pod = pod
    case !ok:
        // Pod was expired. We should add it back.
        cache.addPod(pod)
        ps := &podState{
            pod: pod,
        }
        cache.podStates[key] = ps
    default:
        return fmt.Errorf("pod %v was already in added state", key)
    }
    return nil
}

这时,如果此pod在假定pod缓存中, 就从假定pod中删除,同时重置deadline,这样pod就正式占用节点资源而不是假定了

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容