调度器中assumedPod详解

type schedulerCache struct {
    stop   <-chan struct{}
    ttl    time.Duration
    period time.Duration

    // This mutex guards all fields within this cache struct.
    mu sync.RWMutex
    // a set of assumed pod keys.
    // The key could further be used to get an entry in podStates.
    assumedPods map[string]bool
    // a map from pod key to podState.
    podStates map[string]*podState
    nodes     map[string]*nodeInfoListItem
    // headNode points to the most recently updated NodeInfo in "nodes". It is the
    // head of the linked list.
    headNode *nodeInfoListItem
    nodeTree *nodeTree
    // A map from image name to its imageState.
    imageStates map[string]*imageState
}

调度器缓存中的assumedPods就是所有假定pod的集合

假定pod的产生和作用

调度完成后,pod会加入assumedPods集合

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    ...
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    ...
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // Optimistically assume that the binding will succeed and send it to apiserver
    // in the background.
    // If the binding fails, scheduler will release resources allocated to assumed pod
    // immediately.
    assumed.Spec.NodeName = host

    if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
        klog.Errorf("scheduler cache AssumePod failed: %v", err)
        return err
    }
    // if "assumed" is a nominated pod, we should remove it from internal cache
    if sched.SchedulingQueue != nil {
        sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    }

    return nil
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()
    if _, ok := cache.podStates[key]; ok {
        return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
    }

    cache.addPod(pod)
    ps := &podState{
        pod: pod,
    }
    cache.podStates[key] = ps
    cache.assumedPods[key] = true
    return nil
}

会把此pod加到node缓存的相应node中,占用节点资源

假定pod的自动清理

假定pod的加入时机是"优选"结束, bind之前,在bind结束后,给podState加上deadline超时等待绑定成功的事件

// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.RLock()
    defer cache.mu.RUnlock()

    klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
    currState, ok := cache.podStates[key]
    if ok && cache.assumedPods[key] {
        dl := now.Add(cache.ttl)
        currState.bindingFinished = true
        currState.deadline = &dl
    }
    return nil
}

有一个无限循环的携程在执行以下函数,对超时的假定pod进行warn日志打印和移除,同时解除节点资源的占用

// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
    cache.mu.Lock()
    defer cache.mu.Unlock()
    defer cache.updateMetrics()

    // The size of assumedPods should be small
    for key := range cache.assumedPods {
        ps, ok := cache.podStates[key]
        if !ok {
            klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
        }
        if !ps.bindingFinished {
            klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
                ps.pod.Namespace, ps.pod.Name)
            continue
        }
        if now.After(*ps.deadline) {
            klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
            if err := cache.expirePod(key, ps); err != nil {
                klog.Errorf("ExpirePod failed for %s: %v", key, err)
            }
        }
    }
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
    if err := cache.removePod(ps.pod); err != nil {
        return err
    }
    delete(cache.assumedPods, key)
    delete(cache.podStates, key)
    return nil
}

假定pod正常移除(pod转正)

bind操作给pod的Spec.NodeName赋值,触发pod更新事件

在启动调度器时,设置了几个事件监听,其中有一个已调度pod的事件监听

// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func addAllEventHandlers(
    sched *Scheduler,
    informerFactory informers.SharedInformerFactory,
    podInformer coreinformers.PodInformer,
) {
    // scheduled pod cache
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
                    return false
                default:
                    utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    sched.addPodToCache,
                UpdateFunc: sched.updatePodInCache,
                DeleteFunc: sched.deletePodFromCache,
            },
        },
    )
    ...
// assignedPod selects pods that are assigned (scheduled and running).
func assignedPod(pod *v1.Pod) bool {
    return len(pod.Spec.NodeName) != 0
}

当pod的更新事件是pod.Spec.NodeName从无到有时,触发AddFunc,将pod加到调度缓存

func (sched *Scheduler) addPodToCache(obj interface{}) {
    pod, ok := obj.(*v1.Pod)
    if !ok {
        klog.Errorf("cannot convert to *v1.Pod: %v", obj)
        return
    }
    klog.V(3).Infof("add event for scheduled pod %s/%s ", pod.Namespace, pod.Name)

    if err := sched.SchedulerCache.AddPod(pod); err != nil {
        klog.Errorf("scheduler cache AddPod failed: %v", err)
    }

    sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
    key, err := framework.GetPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()

    currState, ok := cache.podStates[key]
    switch {
    case ok && cache.assumedPods[key]:
        if currState.pod.Spec.NodeName != pod.Spec.NodeName {
            // The pod was added to a different node than it was assumed to.
            klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
            // Clean this up.
            if err = cache.removePod(currState.pod); err != nil {
                klog.Errorf("removing pod error: %v", err)
            }
            cache.addPod(pod)
        }
        delete(cache.assumedPods, key)
        cache.podStates[key].deadline = nil
        cache.podStates[key].pod = pod
    case !ok:
        // Pod was expired. We should add it back.
        cache.addPod(pod)
        ps := &podState{
            pod: pod,
        }
        cache.podStates[key] = ps
    default:
        return fmt.Errorf("pod %v was already in added state", key)
    }
    return nil
}

这时,如果此pod在假定pod缓存中, 就从假定pod中删除,同时重置deadline,这样pod就正式占用节点资源而不是假定了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容