replicaset controller

Deployment和replicaset controller关系剖析

首先看一个deployment的yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
  creationTimestamp: "2021-01-11T02:54:57Z"
  generation: 1
  labels:
    app: nginx
  name: nginx
  namespace: default
spec:
  progressDeadlineSeconds: 600
  replicas: 3
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: nginx
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: nginx
    spec:
      containers:
      - image: nginx:1.15.2
        imagePullPolicy: IfNotPresent
        name: nginx
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30

kubectl create -f nginx-deploy.yaml #生成Deployment

[root@k8s-master01 ~]# kubectl get deployment nginx -owide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
nginx 3/3 3 3 8m38s nginx nginx:1.15.2 app=nginx
[root@k8s-master01 ~]# kubectl get pod -owide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
nginx-66bbc9fdc5-llhnn 1/1 Running 0 8m55s 172.18.195.60 k8s-master03 none none
nginx-66bbc9fdc5-sdhnh 1/1 Running 0 8m55s 172.25.244.203 k8s-master01 none none
nginx-66bbc9fdc5-x4x67 1/1 Running 0 8m55s 172.27.14.230 k8s-node02 none none

对生成的deployment的nginx进行日志分析,确定在deployment产生replica set的信息
#kubectl describe deploy nginx

Events:
  Type    Reason             Age   From                   Message
  ----    ------             ----  ----                   -------
  Normal  ScalingReplicaSet  10m   deployment-controller  Scaled up replica set nginx-66bbc9fdc5 to 3

确认Replica Set

oot@k8s-master01 ~]# kubectl get rs  nginx-66bbc9fdc5
NAME               DESIRED   CURRENT   READY   AGE
nginx-66bbc9fdc5   3         3         3       29m
[root@k8s-master01 ~]#kubectl describe rs nginx-66bbc9fdc5
Name:           nginx-66bbc9fdc5
Namespace:      default
Selector:       app=nginx,pod-template-hash=66bbc9fdc5
Labels:         app=nginx
                pod-template-hash=66bbc9fdc5
Annotations:    deployment.kubernetes.io/desired-replicas: 3
                deployment.kubernetes.io/max-replicas: 4
                deployment.kubernetes.io/revision: 1
Controlled By:  Deployment/nginx
Replicas:       3 current / 3 desired
Pods Status:    3 Running / 0 Waiting / 0 Succeeded / 0 Failed
Pod Template:
  Labels:  app=nginx
           pod-template-hash=66bbc9fdc5
  Containers:
   nginx:
    Image:        nginx:1.15.2
    Port:         none
    Host Port:    none
    Environment:  none
    Mounts:       none
  Volumes:        none
Events:
  Type    Reason            Age   From                   Message
  ----    ------            ----  ----                   -------
  Normal  SuccessfulCreate  30m   replicaset-controller  Created pod: nginx-66bbc9fdc5-x4x67
  Normal  SuccessfulCreate  30m   replicaset-controller  Created pod: nginx-66bbc9fdc5-sdhnh
  Normal  SuccessfulCreate  30m   replicaset-controller  Created pod: nginx-66bbc9fdc5-llhnn

可以清晰的看到3个pod实例是通过Replica set生成的;
kubectl describe podnginx-66bbc9fdc5-x4x67

[root@k8s-master01 ~]# for i in `kubectl get pod | awk 'NR1{print $1}'`;do echo $i;kubectl describe pod $i | tail -n 6;done
nginx-66bbc9fdc5-llhnn
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  42m   default-scheduler  Successfully assigned default/nginx-66bbc9fdc5-llhnn to k8s-master03
  Normal  Pulled     42m   kubelet            Container image "nginx:1.15.2" already present on machine
  Normal  Created    42m   kubelet            Created container nginx
  Normal  Started    42m   kubelet            Started container nginx
nginx-66bbc9fdc5-sdhnh
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  42m   default-scheduler  Successfully assigned default/nginx-66bbc9fdc5-sdhnh to k8s-master01
  Normal  Pulled     42m   kubelet            Container image "nginx:1.15.2" already present on machine
  Normal  Created    42m   kubelet            Created container nginx
  Normal  Started    42m   kubelet            Started container nginx
nginx-66bbc9fdc5-x4x67
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  42m   default-scheduler  Successfully assigned default/nginx-66bbc9fdc5-x4x67 to k8s-node02
  Normal  Pulled     42m   kubelet            Container image "nginx:1.15.2" already present on machine
  Normal  Created    42m   kubelet            Created container nginx
  Normal  Started    42m   kubelet            Started container nginx

从上面一步一步的剖析,我们可以清晰的看到Pod的产生过程:配置完deployment的yaml文件之后:

  • 通过kubectl create 创建一个deployment,那么此时就会调用deployment-controller(deployment控制器)创建一个replica set
  • replica set调用replicaset-controller创建pod
  • Pod创建完之后就会由启用资源调度程序,pod分配对应的node节点,由kubelet管理pod

从上可以知道replicaset controller 是由deployment创建的, 它是deployment的一部分。它是 replicaset 资源对象的控制器,其通过对replicaset、pod 2种资源的监听,当这2种资源发生变化时会触发 replicaset controller 对相应的replicaset对象进行调谐操作,从而完成replicaset期望副本数的调谐,当实际pod的数量未达到预期时创建pod,当实际pod的数量超过预期时删除pod。
replicaset controller主要作用是根据replicaset对象所期望的pod数量与现存pod数量做比较,然后根据比较结果创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等

源码分析

  • ctx.AvailableResources:可用的 GVR,由 cmd/kube-controller-manager/app.GetAvailableResources 通过 pkg/controller.SimpleControllerClientBuilder 创建的 client-go/kubernetes.Clientset 调用 kube-apiserver 的接口获得。
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
        return nil, false, nil
    }
    // 创建并启动控制器
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

eventBroadcaster:记录 ReplicaSet 处理时发生的一些事件,在 kubectl get events 和 kubectl describe 中可以看到

func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
        apps.SchemeGroupVersion.WithKind("ReplicaSet"),
        "replicaset_controller",
        "replicaset",
        controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
        },
    )
}
  • burstReplicas:一次 sync 中创建/删除的 Pod 数量限制
  • expectations:用于判断 rs 所期望的 Pod 的数量是否满足了
  • queue:与 DeploymentController.queue 一样
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    rsc := &ReplicaSetController{
        GroupVersionKind: gvk,
        kubeClient:       kubeClient,
        podControl:       podControl,
        burstReplicas:    burstReplicas,
        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
    }

    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.enqueueReplicaSet,
        UpdateFunc: rsc.updateRS,
        // This will enter the sync loop and no-op, because the replica set has been deleted from the store.
        // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
        // way of achieving this is by performing a `stop` operation on the replica set.
        DeleteFunc: rsc.enqueueReplicaSet,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
        // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
        // local storage, so it should be ok.
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced

    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

启动

与 DeploymentController 类似。

/ Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    // recover
    defer utilruntime.HandleCrash()
    // 关闭工作队列,停止所有 worker
    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    klog.Infof("Starting %v controller", controllerName)
    defer klog.Infof("Shutting down %v controller", controllerName)

    // 等待 Lister 完成一遍同步
    if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }

    // 启动指定数量的 worker 来处理资源对象变更
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    // 等待控制器停止
    <-stopCh
}

func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    // 处理出错,尝试重新入队
    utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

Informer监听到的变更最终会回调到syncReplicaSet方法上,但当中会穿越多个协程,逻辑比较复杂。用一个时序图近似表示如下:

  • Controller变更通知
    这里的controller是Informer层的结构,对于资源变更会触发HandleDeltas()方法。HandleDeltas方法会调用sharedProcessor.distribute方法,将Delta传入到processListener的channel上,等待被处理。
  • processorListener.run
    run方法会不断拉取listener自己本地channel中的变更,并根据ActionType分发到注册的handler上的不同方法里。
    在上文介绍的NewReplicaSetController()函数里,可以看到AddFunc对应的回调函数是enqueueReplicaSet。最终会把delta放入ReplicaSetController自己的queue队列中,等待controller处理。
  • ReplicaSetController.processNextItem
    processNextItem方法会处理RepliaSetController.queue当中的变更信息,最终调用syncReplicaSet方法来处理变更,确保Pods和配置一致。
image.png

控制循环流程图


image.png

任务入队

Expectation 机制

expectations记录了replicaset对象在某一次调谐中期望创建/删除的pod数量,pod创建/删除完成后,该期望数会相应的减少,当期望创建/删除的pod数量小于等于0时,说明上一次调谐中期望创建/删除的pod数量已经达到,调用rsc.expectations.SatisfiedExpectations方法返回true。

根据前面的分析,在replicaset controller对replicaset对象进行调谐操作时,首先会调用rsc.expectations.SatisfiedExpectations方法,返回true且replicaset对象的deletetimestamp为空,才会调用rsc.manageReplicas方法进行期望副本数的调谐操作,也即pod的创建/删除操作。

replicaset controller expectations机制分析
这个 expectations 机制的作用是什么?下面来分析一下。

以创建1000个副本的replicaset为例,分析下expectations的作用。根据前面对replicaset controller的核心处理分析可以得知,1000个pod将通过两次对replicaset对象的调谐,每次500个进行创建。

直接看到replicaset controller的核心处理逻辑方法syncReplicaSet

syncReplicaSet

每次调用rsc.manageReplicas方法前,都会调用rsc.expectations.SatisfiedExpectations来判断是否可以进行replicaset期望副本的调谐操作(pod的创建删除操作),返回true时才会调用rsc.manageReplicas方法。

// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()

    ...

    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
        return nil
    }

    ...

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    
    ...
}

rsc.expectations.SatisfiedExpectations

接下来看到rsc.expectations.SatisfiedExpectations方法,主要是用于判断是否需要在syncReplicaSet核心处理方法中调用rsc.manageReplicas方法来进行pod的创建删除操作。

  • 第一次进来(首次创建replicaset)时r.GetExpectations找不到该rs对象对应的expectations,exists的值为false,所以rsc.expectations.SatisfiedExpectations方法返回true,也就是说syncReplicaSet方法中会调用rsc.manageReplicas方法来进行pod的创建操作,并在rsc.manageReplicas方法中设置expectations为期望创建500个pod;
  • 在第一次创建500个pod的操作没有完成之前,以及第一次创建500个pod的操作开始后的5分钟之内,exp.Fulfilled与exp.isExpired都返回false,所以rsc.expectations.SatisfiedExpectations方法返回false,也就是说syncReplicaSet方法中不会调用rsc.manageReplicas方法来进行pod的创建操作;
  • 在第一次创建500个pod的操作完成之后,或者第一次创建500个pod操作进行了5分钟有余,则exp.Fulfilled或exp.isExpired会返回true,所以rsc.expectations.SatisfiedExpectations方法返回true,也就是说syncReplicaSet方法中会调用rsc.manageReplicas方法来进行第二次500个pod的创建操作,并在rsc.manageReplicas方法中再次设置expectations为期望创建500个pod。
// pkg/controller/controller_utils.go
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
    if exp, exists, err := r.GetExpectations(controllerKey); exists {
        if exp.Fulfilled() {
            klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
            return true
        } else if exp.isExpired() {
            klog.V(4).Infof("Controller expectations expired %#v", exp)
            return true
        } else {
            klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
            return false
        }
    } else if err != nil {
        klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
    } else {
        // When a new controller is created, it doesn't have expectations.
        // When it doesn't see expected watch events for > TTL, the expectations expire.
        //  - In this case it wakes up, creates/deletes controllees, and sets expectations again.
        // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
        //  - In this case it continues without setting expectations till it needs to create/delete controllees.
        klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
    }
    // Trigger a sync if we either encountered and error (which shouldn't happen since we're
    // getting from local store) or this controller hasn't established expectations.
    return true
}

exp.Fulfilled

判断replicaset对象的expectations里的期望创建pod数量以及期望删除pod数量,都小于等于0时返回true。

// Fulfilled returns true if this expectation has been fulfilled.
func (e *ControlleeExpectations) Fulfilled() bool {
    // TODO: think about why this line being atomic doesn't matter
    return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

exp.isExpired

判断replicaset对象上次设置expectations时的时间距离现在的时间是否已经超过5分钟,是则返回true。

func (exp *ControlleeExpectations) isExpired() bool {
    return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}

核心处理方法,主要是根据replicaset所期望的pod数量与现存pod数量做比较,然后根据比较结果创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等。

(1)创建pod之前,会调用rsc.expectations.ExpectCreations来设置Expectations:(key,add:500,del:0);
(2)调用slowStartBatch来执行pod的创建;
(3)创建完pod之后,判断是否有创建失败的pod,并根据创建失败的pod数量,调用rsc.expectations.CreationObserved减去Expectations中相应的add的值。

// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    
    ...
    
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        
        rsc.expectations.ExpectCreations(rsKey, diff)
        klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            ...
        })

        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
            for i := 0; i < skippedPods; i++ {
                // Decrement the expected number of creates because the informer won't observe this pod
                rsc.expectations.CreationObserved(rsKey)
            }
        }
    ...

replicaset controller第一次创建了500个pod之后,通过replicaset controller对pod新增事件的watch,然后调用rsc.expectations.CreationObserved方法将Expectations中期望创建的pod数量减1,以及rsc.manageReplicas方法中对创建失败的pod数量,调用相应次数的rsc.expectations.CreationObserved方法将Expectations中期望创建的pod数量相应减少,最终使该replicaset对象的Expectations的值将变为:(key,add:0,del:0),这样在下次对该replicaset对象的调谐操作中,即可进行下一批次的500个pod的创建。

rsc.manageReplicas:如果 rsc.expectations.SatisfiedExpectations(key) && rs.DeletionTimestamp == nil,检查下是否还需要创建/删除 Pod
diff = len(filteredPods) - rs.Spec.Replicas

  • 如果 diff < 0,说明还需要创建 Pod
  • creationCount = min(-diff, rsc.burstReplicas)
  • rsc.expectations.ExpectCreations(rsKey, creationCount)
  • slowStartBatch:批量创建 Pod,返回创建成功的 Pod 数量 successfulCreations
  • skippedPods = creationCount - successfulCreations:需要从 expectations 减掉没有创建的 Pod 数量 rsc.expectations.CreationObserved(rsKey)
  • 如果 diff > 0,说明还需要删除 Pod
  • deletionCount = min(diff, rsc.burstReplicas)
  • podsToDelete = getPodsToDelete(filteredPods, deletionCount):尽量删除不健康的 Pod
  • rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
  • 遍历 podsToDelete 逐个删除,并且更新 expectations:rsc.expectations.DeletionObserved(rsKey, podKey)
  • calculateStatus:计算新的 rs.Status
  • Replicas:len(filteredPods)
  • FullyLabeledReplicas:pod.Labels 匹配 rs.Spec.Template.Labels 的 Pod 数量
  • ReadyReplicas:GetPodCondition(pod.Status, v1.PodReady).Status == v1.ConditionTrue 的 Pod 数量
  • AvailableReplicas:处于 Ready 状态,并且持续 rs.Spec.MinReadySeconds 时长的 Pod 数量
    -rsc.queue.AddAfter:延迟 updatedRS.Spec.MinReadySeconds 秒后把 rs 重新入队
    核心创建删除pod方法,主要是根据replicaset所期望的pod数量与现存pod数量做比较,然后根据比较结果来创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等,需要特别注意的是,每一次调用rsc.manageReplicas方法,创建/删除pod的个数上限为500。

在replicaset对象的调谐中,rsc.manageReplicas方法不一定每一次都会调用执行,只有当rsc.expectations.SatisfiedExpectations方法返回true,且replicaset对象的DeletionTimestamp属性为空时,才会进行rsc.manageReplicas方法的调用。
diff = 现存pod数量 - 期望的pod数量

diff := len(filteredPods) - int(*(rs.Spec.Replicas))
(1)当现存pod数量比期望的少时,需要创建pod,进入创建pod的逻辑代码块。
(2)当现存pod数量比期望的多时,需要删除pod,进入删除pod的逻辑代码块。

一次同步操作中批量创建或删除pod的个数上限为rsc.burstReplicas,即500个。

// pkg/controller/replicaset/replica_set.go
const (
    // Realistic value of the burstReplica field for the replica set manager based off
    // performance requirements for kubernetes 1.0.
    BurstReplicas = 500

    // The number of times we retry updating a ReplicaSet's status.
    statusUpdateRetries = 1
)
    if diff > rsc.burstReplicas {
        diff = rsc.burstReplicas
    }
rsc.expectations.ExpectCreations

设置replicaset对象的expectations。

// pkg/controller/controller_utils.go
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
    return r.SetExpectations(controllerKey, adds, 0)
}

// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
    exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
    klog.V(4).Infof("Setting expectations %#v", exp)
    return r.Add(exp)
}
rsc.expectations.CreationObserved

将replicaset对象expectations中期望创建的pod数量减1.

// pkg/controller/controller_utils.go
// CreationObserved atomically decrements the `add` expectation count of the given controller.
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
    r.LowerExpectations(controllerKey, 1, 0)
}

// Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
    if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
        exp.Add(int64(-add), int64(-del))
        // The expectations might've been modified since the update on the previous line.
        klog.V(4).Infof("Lowered expectations %#v", exp)
    }
}

那正常情况下(即没有pod创建异常)Expectations在什么时候会更新为(key,add:0,del:0)呢,继续看下面的分析。

pod add event handlerFunc-addPod
replicaset controller会监听pod的新增事件,每成功创建出一个pod,会调用addPod方法。在addPod方法中,同样会调用一次rsc.expectations.CreationObserved,将Expectations中期望创建的pod数量减1。

// pkg/controller/replicaset/replica_set.go
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)

    ...
    
    // If it has a ControllerRef, that's all that matters.
    if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
        rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
        if rs == nil {
            return
        }
        rsKey, err := controller.KeyFunc(rs)
        if err != nil {
            return
        }
        klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
        rsc.expectations.CreationObserved(rsKey)
        rsc.queue.Add(rsKey)
        return
    }

    ...
}

rsc.manageReplicas

核心创建删除pod方法,主要是根据replicaset所期望的pod数量与现存pod数量做比较,然后根据比较结果来创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等,需要特别注意的是,每一次调用rsc.manageReplicas方法,创建/删除pod的个数上限为500。

在replicaset对象的调谐中,rsc.manageReplicas方法不一定每一次都会调用执行,只有当rsc.expectations.SatisfiedExpectations方法返回true,且replicaset对象的DeletionTimestamp属性为空时,才会进行rsc.manageReplicas方法的调用。
diff = 现存pod数量 - 期望的pod数量

diff := len(filteredPods) - int(*(rs.Spec.Replicas))
(1)当现存pod数量比期望的少时,需要创建pod,进入创建pod的逻辑代码块。
(2)当现存pod数量比期望的多时,需要删除pod,进入删除pod的逻辑代码块。

一次同步操作中批量创建或删除pod的个数上限为rsc.burstReplicas,即500个。

// pkg/controller/replicaset/replica_set.go
const (
    // Realistic value of the burstReplica field for the replica set manager based off
    // performance requirements for kubernetes 1.0.
    BurstReplicas = 500

    // The number of times we retry updating a ReplicaSet's status.
    statusUpdateRetries = 1
)
    if diff > rsc.burstReplicas {
        diff = rsc.burstReplicas
    }

接下来分析一下创建/删除pod的逻辑代码块。

创建pod逻辑代码块

主要逻辑:
(1)运算获取需要创建的pod数量,并设置数量上限500;
(2)调用rsc.expectations.ExpectCreations,将本轮调谐期望创建的pod数量设置进expectations;
(3)调用slowStartBatch函数来对pod进行创建逻辑处理;
(4)调用slowStartBatch函数完成后,计算获取创建失败的pod的数量,然后调用相应次数的rsc.expectations.CreationObserved方法,减去本轮调谐中期望创建的pod数量。
为什么要减呢?因为expectations记录了replicaset对象在某一次调谐中期望创建/删除的pod数量,pod创建/删除完成后,replicaset controller会watch到pod的创建/删除事件,从而调用rsc.expectations.CreationObserved方法来使期望创建/删除的pod数量减少。当有相应数量的pod创建/删除失败后,replicaset controller是不会watch到相应的pod创建/删除事件的,所以必须把本轮调谐期望创建/删除的pod数量做相应的减法,否则本轮调谐中的期望创建/删除pod数量永远不可能小于等于0,这样的话,rsc.expectations.SatisfiedExpectations方法就只会等待expectations超时期限到达才会返回true了。

        diff *= -1
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        
        rsc.expectations.ExpectCreations(rsKey, diff)
        glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            boolPtr := func(b bool) *bool { return &b }
            controllerRef := &metav1.OwnerReference{
                APIVersion:         rsc.GroupVersion().String(),
                Kind:               rsc.Kind,
                Name:               rs.Name,
                UID:                rs.UID,
                BlockOwnerDeletion: boolPtr(true),
                Controller:         boolPtr(true),
            }
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
            if err != nil && errors.IsTimeout(err) {
                // Pod is created but its initialization has timed out.
                // If the initialization is successful eventually, the
                // controller will observe the creation via the informer.
                // If the initialization fails, or if the pod keeps
                // uninitialized for a long time, the informer will not
                // receive any update, and the controller will create a new
                // pod when the expectation expires.
                return nil
            }
            return err
        })

        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
            for i := 0; i < skippedPods; i++ {
                // Decrement the expected number of creates because the informer won't observe this pod
                rsc.expectations.CreationObserved(rsKey)
            }
        }
        return err

slowStartBatch

来看到slowStartBatch,可以看到创建pod的算法为:
(1)每次批量创建的 pod 数依次为 1、2、4、8......,呈指数级增长,起与要创建的pod数量相同的goroutine来负责创建pod。
(2)创建pod按1、2、4、8...的递增趋势分多批次进行,若某批次创建pod有失败的(如apiserver限流,丢弃请求等,注意:超时除外,因为initialization处理有可能超时),则后续批次不再进行,结束本次函数调用。

// pkg/controller/replicaset/replica_set.go
// slowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
    remaining := count
    successes := 0
    for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
        errCh := make(chan error, batchSize)
        var wg sync.WaitGroup
        wg.Add(batchSize)
        for i := 0; i < batchSize; i++ {
            go func() {
                defer wg.Done()
                if err := fn(); err != nil {
                    errCh <- err
                }
            }()
        }
        wg.Wait()
        curSuccesses := batchSize - len(errCh)
        successes += curSuccesses
        if len(errCh) > 0 {
            return successes, <-errCh
        }
        remaining -= batchSize
    }
    return successes, nil
}
rsc.podControl.CreatePodsWithControllerRef

前面定义的创建pod时调用的方法为rsc.podControl.CreatePodsWithControllerRef。

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
    if err := validateControllerRef(controllerRef); err != nil {
        return err
    }
    return r.createPods("", namespace, template, controllerObject, controllerRef)
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
    pod, err := GetPodFromTemplate(template, object, controllerRef)
    if err != nil {
        return err
    }
    if len(nodeName) != 0 {
        pod.Spec.NodeName = nodeName
    }
    if len(labels.Set(pod.Labels)) == 0 {
        return fmt.Errorf("unable to create pods, no labels")
    }
    newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
    if err != nil {
        // only send an event if the namespace isn't terminating
        if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
            r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
        }
        return err
    }
    accessor, err := meta.Accessor(object)
    if err != nil {
        klog.Errorf("parentObject does not have ObjectMeta, %v", err)
        return nil
    }
    klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
    r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)

    return nil
}

删除逻辑代码块

主要逻辑:
(1)运算获取需要删除的pod数量,并设置数量上限500;
(2)根据要缩容删除的pod数量,先调用getPodsToDelete函数找出需要删除的pod列表;
(3)调用rsc.expectations.ExpectCreations,将本轮调谐期望删除的pod数量设置进expectations;
(4)每个pod拉起一个goroutine,调用rsc.podControl.DeletePod来删除该pod;
(5)对于删除失败的pod,会调用rsc.expectations.DeletionObserved方法,减去本轮调谐中期望创建的pod数量。
至于为什么要减,原因跟上面创建逻辑代码块中分析的一样。
(6)等待所有gorouutine完成,return返回。

if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)

        // Choose which Pods to delete, preferring those in earlier phases of startup.
        podsToDelete := getPodsToDelete(filteredPods, diff)

        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(targetPod)
                    glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
                    rsc.expectations.DeletionObserved(rsKey, podKey)
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }

getPodsToDelete

getPodsToDelete:根据要缩容删除的pod数量,然后返回需要删除的pod列表。

// pkg/controller/replicaset/replica_set.go
func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
    // No need to sort pods if we are about to delete all of them.
    // diff will always be <= len(filteredPods), so not need to handle > case.
    if diff < len(filteredPods) {
        podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
        sort.Sort(podsWithRanks)
    }
    return filteredPods[:diff]
}

func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
    podsOnNode := make(map[string]int)
    for _, pod := range relatedPods {
        if controller.IsPodActive(pod) {
            podsOnNode[pod.Spec.NodeName]++
        }
    }
    ranks := make([]int, len(podsToRank))
    for i, pod := range podsToRank {
        ranks[i] = podsOnNode[pod.Spec.NodeName]
    }
    return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}
}

筛选要删除的pod逻辑
按照下面的排序规则,从上到下进行排序,各个条件相互互斥,符合其中一个条件则排序完成:
(1)优先删除没有绑定node的pod;
(2)优先删除处于Pending状态的pod,然后是Unknown,最后才是Running;
(3)优先删除Not ready的pod,然后才是ready的pod;
(4)按同node上所属replicaset的pod数量排序,优先删除所属replicaset的pod数量多的node上的pod;
(5)按pod ready的时间排序,优先删除ready时间最短的pod;
(6)优先删除pod中容器重启次数较多的pod;
(7)按pod创建时间排序,优先删除创建时间最短的pod。

// pkg/controller/controller_utils.go
func (s ActivePodsWithRanks) Less(i, j int) bool {
    // 1. Unassigned < assigned
    // If only one of the pods is unassigned, the unassigned one is smaller
    if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) {
        return len(s.Pods[i].Spec.NodeName) == 0
    }
    // 2. PodPending < PodUnknown < PodRunning
    if podPhaseToOrdinal[s.Pods[i].Status.Phase] != podPhaseToOrdinal[s.Pods[j].Status.Phase] {
        return podPhaseToOrdinal[s.Pods[i].Status.Phase] < podPhaseToOrdinal[s.Pods[j].Status.Phase]
    }
    // 3. Not ready < ready
    // If only one of the pods is not ready, the not ready one is smaller
    if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) {
        return !podutil.IsPodReady(s.Pods[i])
    }
    // 4. Doubled up < not doubled up
    // If one of the two pods is on the same node as one or more additional
    // ready pods that belong to the same replicaset, whichever pod has more
    // colocated ready pods is less
    if s.Rank[i] != s.Rank[j] {
        return s.Rank[i] > s.Rank[j]
    }
    // TODO: take availability into account when we push minReadySeconds information from deployment into pods,
    //       see https://github.com/kubernetes/kubernetes/issues/22065
    // 5. Been ready for empty time < less time < more time
    // If both pods are ready, the latest ready one is smaller
    if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) {
        readyTime1 := podReadyTime(s.Pods[i])
        readyTime2 := podReadyTime(s.Pods[j])
        if !readyTime1.Equal(readyTime2) {
            return afterOrZero(readyTime1, readyTime2)
        }
    }
    // 6. Pods with containers with higher restart counts < lower restart counts
    if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) {
        return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j])
    }
    // 7. Empty creation time pods < newer pods < older pods
    if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) {
        return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp)
    }
    return false
}

rsc.podControl.DeletePod

删除pod的方法。

// pkg/controller/controller_utils.go
func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
    accessor, err := meta.Accessor(object)
    if err != nil {
        return fmt.Errorf("object does not have ObjectMeta, %v", err)
    }
    klog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
    if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil && !apierrors.IsNotFound(err) {
        r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
        return fmt.Errorf("unable to delete pods: %v", err)
    }
    r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)

    return nil
}

calculateStatus

calculateStatus函数计算并返回replicaset对象的status。

怎么计算status呢?
(1)根据现存pod数量、Ready状态的pod数量、availabel状态的pod数量等,给replicaset对象的status的Replicas、ReadyReplicas、AvailableReplicas等字段赋值;
(2)根据replicaset对象现有status中的condition配置以及前面调用rsc.manageReplicas方法后是否有错误,来决定给status新增condition或移除condition,conditionType为ReplicaFailure。

当调用rsc.manageReplicas方法出错,且replicaset对象的status中,没有conditionType为ReplicaFailure的condition,则新增conditionType为ReplicaFailure的condition,表示该replicaset创建/删除pod出错;
当调用rsc.manageReplicas方法没有任何错误,且replicaset对象的status中,有conditionType为ReplicaFailure的condition,则去除该condition,表示该replicaset创建/删除pod成功。

func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
    newStatus := rs.Status
    // Count the number of pods that have labels matching the labels of the pod
    // template of the replica set, the matching pods may have more
    // labels than are in the template. Because the label of podTemplateSpec is
    // a superset of the selector of the replica set, so the possible
    // matching pods must be part of the filteredPods.
    fullyLabeledReplicasCount := 0
    readyReplicasCount := 0
    availableReplicasCount := 0
    templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
    for _, pod := range filteredPods {
        if templateLabel.Matches(labels.Set(pod.Labels)) {
            fullyLabeledReplicasCount++
        }
        if podutil.IsPodReady(pod) {
            readyReplicasCount++
            if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
                availableReplicasCount++
            }
        }
    }

    failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
    if manageReplicasErr != nil && failureCond == nil {
        var reason string
        if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
            reason = "FailedCreate"
        } else if diff > 0 {
            reason = "FailedDelete"
        }
        cond := NewReplicaSetCondition(apps.ReplicaSetReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
        SetCondition(&newStatus, cond)
    } else if manageReplicasErr == nil && failureCond != nil {
        RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
    }

    newStatus.Replicas = int32(len(filteredPods))
    newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
    newStatus.ReadyReplicas = int32(readyReplicasCount)
    newStatus.AvailableReplicas = int32(availableReplicasCount)
    return newStatus
}

updateReplicaSetStatus

主要逻辑:
(1)判断新计算出来的status中的各个属性如Replicas、ReadyReplicas、AvailableReplicas以及Conditions是否与现存replicaset对象的status中的一致,一致则不用做更新操作,直接return;
(2)调用c.UpdateStatus更新replicaset的status。

// pkg/controller/replicaset/replica_set_utils.go
func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
    // This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
    // we do a periodic relist every 30s. If the generations differ but the replicas are
    // the same, a caller might've resized to the same replica count.
    if rs.Status.Replicas == newStatus.Replicas &&
        rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
        rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
        rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
        rs.Generation == rs.Status.ObservedGeneration &&
        reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
        return rs, nil
    }

    // Save the generation number we acted on, otherwise we might wrongfully indicate
    // that we've seen a spec update when we retry.
    // TODO: This can clobber an update if we allow multiple agents to write to the
    // same status.
    newStatus.ObservedGeneration = rs.Generation

    var getErr, updateErr error
    var updatedRS *apps.ReplicaSet
    for i, rs := 0, rs; ; i++ {
        klog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
            fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
            fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
            fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
            fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
            fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))

        rs.Status = newStatus
        updatedRS, updateErr = c.UpdateStatus(rs)
        if updateErr == nil {
            return updatedRS, nil
        }
        // Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit.
        if i >= statusUpdateRetries {
            break
        }
        // Update the ReplicaSet with the latest resource version for the next poll
        if rs, getErr = c.Get(rs.Name, metav1.GetOptions{}); getErr != nil {
            // If the GET fails we can't trust status.Replicas anymore. This error
            // is bound to be more interesting than the update failure.
            return nil, getErr
        }
    }

    return nil, updateErr
}
c.UpdateStatus
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/replicaset.go
func (c *replicaSets) UpdateStatus(replicaSet *v1.ReplicaSet) (result *v1.ReplicaSet, err error) {
    result = &v1.ReplicaSet{}
    err = c.client.Put().
        Namespace(c.ns).
        Resource("replicasets").
        Name(replicaSet.Name).
        SubResource("status").
        Body(replicaSet).
        Do().
        Into(result)
    return
}

总结

image

replicaset controller核心处理逻辑

replicaset controller的核心处理逻辑是根据replicaset对象里期望的pod数量以及现存pod数量的比较,当期望pod数量比现存pod数量多时,调用创建pod算法创建出新的pod,直至达到期望数量;当期望pod数量比现存pod数量少时,调用删除pod算法,并根据一定的策略对现存pod列表做排序,从中按顺序选择多余的pod然后删除,直至达到期望数量。

image

replicaset controller创建pod算法

replicaset controller创建pod的算法是,按1、2、4、8...的递增趋势分多批次进行(每次调谐中创建pod的数量上限为500个,超过上限的会在下次调谐中再创建),若某批次创建pod有失败的(如apiserver限流,丢弃请求等,注意:超时除外,因为initialization处理有可能超时),则后续批次的pod创建不再进行,需等待该repliaset对象下次调谐时再触发该pod创建算法,进行pod的创建,直至达到期望数量。

replicaset controller删除pod算法

replicaset controller删除pod的算法是,先根据一定的策略将现存pod列表做排序,然后按顺序从中选择指定数量的pod,拉起与要删除的pod数量相同的goroutine来删除pod(每次调谐中删除pod的数量上限为500个),并等待所有goroutine执行完成。删除pod有失败的(如apiserver限流,丢弃请求)或超过500上限的部分,需等待该repliaset对象下次调谐时再触发该pod删除算法,进行pod的删除,直至达到期望数量。

筛选要删除的pod逻辑

按照下面的排序规则,从上到下进行排序,各个条件相互互斥,符合其中一个条件则排序完成:
(1)优先删除没有绑定node的pod;
(2)优先删除处于Pending状态的pod,然后是Unknown,最后才是Running;
(3)优先删除Not ready的pod,然后才是ready的pod;
(4)按同node上所属replicaset的pod数量排序,优先删除所属replicaset的pod数量多的node上的pod;
(5)按pod ready的时间排序,优先删除ready时间最短的pod;
(6)优先删除pod中容器重启次数较多的pod;
(7)按pod创建时间排序,优先删除创建时间最短的pod

expectations机制作用总结

expectations的过期时间机制解决了某一批次创建/删除pod因某些原因一直卡住不能完成而导致的replicaset期望副本数永远达不到预期的问题。

expectations.SatisfiedExpectations返回true,则进入核心处理方法rsc.manageReplicas,根据replicaset所期望的pod数量与现存pod数量做比较,判断是否需要进行下一批次的创建/删除pod的任务。

综上可以看出,expectations主要用于控制让多个创建/删除pod批次串行执行,不让其并行执行,防止了并发执行所可能产生的重复删除pod、创建出replicaset所期望的pod数量以外的多余的pod等问题(当replicaset对象的某一创建/删除pod的批次还在进行中,这时再次进行pod的创建删除操作,如果没有expectations的判断控制,就会再次进行pod的批量创建/删除时,从而导致该问题的发生)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容