Deployment和replicaset controller关系剖析
首先看一个deployment的yaml
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
deployment.kubernetes.io/revision: "1"
creationTimestamp: "2021-01-11T02:54:57Z"
generation: 1
labels:
app: nginx
name: nginx
namespace: default
spec:
progressDeadlineSeconds: 600
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app: nginx
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: nginx
spec:
containers:
- image: nginx:1.15.2
imagePullPolicy: IfNotPresent
name: nginx
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
kubectl create -f nginx-deploy.yaml #生成Deployment
[root@k8s-master01 ~]# kubectl get deployment nginx -owide
NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR
nginx 3/3 3 3 8m38s nginx nginx:1.15.2 app=nginx
[root@k8s-master01 ~]# kubectl get pod -owide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
nginx-66bbc9fdc5-llhnn 1/1 Running 0 8m55s 172.18.195.60 k8s-master03 none none
nginx-66bbc9fdc5-sdhnh 1/1 Running 0 8m55s 172.25.244.203 k8s-master01 none none
nginx-66bbc9fdc5-x4x67 1/1 Running 0 8m55s 172.27.14.230 k8s-node02 none none
对生成的deployment的nginx进行日志分析,确定在deployment产生replica set的信息
#kubectl describe deploy nginx
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal ScalingReplicaSet 10m deployment-controller Scaled up replica set nginx-66bbc9fdc5 to 3
确认Replica Set
oot@k8s-master01 ~]# kubectl get rs nginx-66bbc9fdc5
NAME DESIRED CURRENT READY AGE
nginx-66bbc9fdc5 3 3 3 29m
[root@k8s-master01 ~]#kubectl describe rs nginx-66bbc9fdc5
Name: nginx-66bbc9fdc5
Namespace: default
Selector: app=nginx,pod-template-hash=66bbc9fdc5
Labels: app=nginx
pod-template-hash=66bbc9fdc5
Annotations: deployment.kubernetes.io/desired-replicas: 3
deployment.kubernetes.io/max-replicas: 4
deployment.kubernetes.io/revision: 1
Controlled By: Deployment/nginx
Replicas: 3 current / 3 desired
Pods Status: 3 Running / 0 Waiting / 0 Succeeded / 0 Failed
Pod Template:
Labels: app=nginx
pod-template-hash=66bbc9fdc5
Containers:
nginx:
Image: nginx:1.15.2
Port: none
Host Port: none
Environment: none
Mounts: none
Volumes: none
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreate 30m replicaset-controller Created pod: nginx-66bbc9fdc5-x4x67
Normal SuccessfulCreate 30m replicaset-controller Created pod: nginx-66bbc9fdc5-sdhnh
Normal SuccessfulCreate 30m replicaset-controller Created pod: nginx-66bbc9fdc5-llhnn
可以清晰的看到3个pod实例是通过Replica set生成的;
kubectl describe podnginx-66bbc9fdc5-x4x67
[root@k8s-master01 ~]# for i in `kubectl get pod | awk 'NR1{print $1}'`;do echo $i;kubectl describe pod $i | tail -n 6;done
nginx-66bbc9fdc5-llhnn
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 42m default-scheduler Successfully assigned default/nginx-66bbc9fdc5-llhnn to k8s-master03
Normal Pulled 42m kubelet Container image "nginx:1.15.2" already present on machine
Normal Created 42m kubelet Created container nginx
Normal Started 42m kubelet Started container nginx
nginx-66bbc9fdc5-sdhnh
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 42m default-scheduler Successfully assigned default/nginx-66bbc9fdc5-sdhnh to k8s-master01
Normal Pulled 42m kubelet Container image "nginx:1.15.2" already present on machine
Normal Created 42m kubelet Created container nginx
Normal Started 42m kubelet Started container nginx
nginx-66bbc9fdc5-x4x67
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 42m default-scheduler Successfully assigned default/nginx-66bbc9fdc5-x4x67 to k8s-node02
Normal Pulled 42m kubelet Container image "nginx:1.15.2" already present on machine
Normal Created 42m kubelet Created container nginx
Normal Started 42m kubelet Started container nginx
从上面一步一步的剖析,我们可以清晰的看到Pod的产生过程:配置完deployment的yaml文件之后:
- 通过kubectl create 创建一个deployment,那么此时就会调用deployment-controller(deployment控制器)创建一个replica set
- replica set调用replicaset-controller创建pod
- Pod创建完之后就会由启用资源调度程序,pod分配对应的node节点,由kubelet管理pod
从上可以知道replicaset controller 是由deployment创建的, 它是deployment的一部分。它是 replicaset 资源对象的控制器,其通过对replicaset、pod 2种资源的监听,当这2种资源发生变化时会触发 replicaset controller 对相应的replicaset对象进行调谐操作,从而完成replicaset期望副本数的调谐,当实际pod的数量未达到预期时创建pod,当实际pod的数量超过预期时删除pod。
replicaset controller主要作用是根据replicaset对象所期望的pod数量与现存pod数量做比较,然后根据比较结果创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等
源码分析
- ctx.AvailableResources:可用的 GVR,由 cmd/kube-controller-manager/app.GetAvailableResources 通过 pkg/controller.SimpleControllerClientBuilder 创建的 client-go/kubernetes.Clientset 调用 kube-apiserver 的接口获得。
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
// 创建并启动控制器
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
eventBroadcaster:记录 ReplicaSet 处理时发生的一些事件,在 kubectl get events 和 kubectl describe 中可以看到
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
"replicaset_controller",
"replicaset",
controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
)
}
- burstReplicas:一次 sync 中创建/删除的 Pod 数量限制
- expectations:用于判断 rs 所期望的 Pod 的数量是否满足了
- queue:与 DeploymentController.queue 一样
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
启动
与 DeploymentController 类似。
/ Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
// recover
defer utilruntime.HandleCrash()
// 关闭工作队列,停止所有 worker
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
// 等待 Lister 完成一遍同步
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
// 启动指定数量的 worker 来处理资源对象变更
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
// 等待控制器停止
<-stopCh
}
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
// 处理出错,尝试重新入队
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
Informer监听到的变更最终会回调到syncReplicaSet方法上,但当中会穿越多个协程,逻辑比较复杂。用一个时序图近似表示如下:
- Controller变更通知
这里的controller是Informer层的结构,对于资源变更会触发HandleDeltas()方法。HandleDeltas方法会调用sharedProcessor.distribute方法,将Delta传入到processListener的channel上,等待被处理。 - processorListener.run
run方法会不断拉取listener自己本地channel中的变更,并根据ActionType分发到注册的handler上的不同方法里。
在上文介绍的NewReplicaSetController()函数里,可以看到AddFunc对应的回调函数是enqueueReplicaSet。最终会把delta放入ReplicaSetController自己的queue队列中,等待controller处理。 - ReplicaSetController.processNextItem
processNextItem方法会处理RepliaSetController.queue当中的变更信息,最终调用syncReplicaSet方法来处理变更,确保Pods和配置一致。
控制循环流程图
任务入队
Expectation 机制
expectations记录了replicaset对象在某一次调谐中期望创建/删除的pod数量,pod创建/删除完成后,该期望数会相应的减少,当期望创建/删除的pod数量小于等于0时,说明上一次调谐中期望创建/删除的pod数量已经达到,调用rsc.expectations.SatisfiedExpectations方法返回true。
根据前面的分析,在replicaset controller对replicaset对象进行调谐操作时,首先会调用rsc.expectations.SatisfiedExpectations方法,返回true且replicaset对象的deletetimestamp为空,才会调用rsc.manageReplicas方法进行期望副本数的调谐操作,也即pod的创建/删除操作。
replicaset controller expectations机制分析
这个 expectations 机制的作用是什么?下面来分析一下。
以创建1000个副本的replicaset为例,分析下expectations的作用。根据前面对replicaset controller的核心处理分析可以得知,1000个pod将通过两次对replicaset对象的调谐,每次500个进行创建。
直接看到replicaset controller的核心处理逻辑方法syncReplicaSet
syncReplicaSet
每次调用rsc.manageReplicas方法前,都会调用rsc.expectations.SatisfiedExpectations来判断是否可以进行replicaset期望副本的调谐操作(pod的创建删除操作),返回true时才会调用rsc.manageReplicas方法。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
...
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}
...
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
...
}
rsc.expectations.SatisfiedExpectations
接下来看到rsc.expectations.SatisfiedExpectations方法,主要是用于判断是否需要在syncReplicaSet核心处理方法中调用rsc.manageReplicas方法来进行pod的创建删除操作。
- 第一次进来(首次创建replicaset)时r.GetExpectations找不到该rs对象对应的expectations,exists的值为false,所以rsc.expectations.SatisfiedExpectations方法返回true,也就是说syncReplicaSet方法中会调用rsc.manageReplicas方法来进行pod的创建操作,并在rsc.manageReplicas方法中设置expectations为期望创建500个pod;
- 在第一次创建500个pod的操作没有完成之前,以及第一次创建500个pod的操作开始后的5分钟之内,exp.Fulfilled与exp.isExpired都返回false,所以rsc.expectations.SatisfiedExpectations方法返回false,也就是说syncReplicaSet方法中不会调用rsc.manageReplicas方法来进行pod的创建操作;
- 在第一次创建500个pod的操作完成之后,或者第一次创建500个pod操作进行了5分钟有余,则exp.Fulfilled或exp.isExpired会返回true,所以rsc.expectations.SatisfiedExpectations方法返回true,也就是说syncReplicaSet方法中会调用rsc.manageReplicas方法来进行第二次500个pod的创建操作,并在rsc.manageReplicas方法中再次设置expectations为期望创建500个pod。
// pkg/controller/controller_utils.go
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
if exp.Fulfilled() {
klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
return true
} else if exp.isExpired() {
klog.V(4).Infof("Controller expectations expired %#v", exp)
return true
} else {
klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
return false
}
} else if err != nil {
klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
} else {
// When a new controller is created, it doesn't have expectations.
// When it doesn't see expected watch events for > TTL, the expectations expire.
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete controllees.
klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
}
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
// getting from local store) or this controller hasn't established expectations.
return true
}
exp.Fulfilled
判断replicaset对象的expectations里的期望创建pod数量以及期望删除pod数量,都小于等于0时返回true。
// Fulfilled returns true if this expectation has been fulfilled.
func (e *ControlleeExpectations) Fulfilled() bool {
// TODO: think about why this line being atomic doesn't matter
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}
exp.isExpired
判断replicaset对象上次设置expectations时的时间距离现在的时间是否已经超过5分钟,是则返回true。
func (exp *ControlleeExpectations) isExpired() bool {
return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}
核心处理方法,主要是根据replicaset所期望的pod数量与现存pod数量做比较,然后根据比较结果创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等。
(1)创建pod之前,会调用rsc.expectations.ExpectCreations来设置Expectations:(key,add:500,del:0);
(2)调用slowStartBatch来执行pod的创建;
(3)创建完pod之后,判断是否有创建失败的pod,并根据创建失败的pod数量,调用rsc.expectations.CreationObserved减去Expectations中相应的add的值。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
...
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
...
})
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
...
replicaset controller第一次创建了500个pod之后,通过replicaset controller对pod新增事件的watch,然后调用rsc.expectations.CreationObserved方法将Expectations中期望创建的pod数量减1,以及rsc.manageReplicas方法中对创建失败的pod数量,调用相应次数的rsc.expectations.CreationObserved方法将Expectations中期望创建的pod数量相应减少,最终使该replicaset对象的Expectations的值将变为:(key,add:0,del:0),这样在下次对该replicaset对象的调谐操作中,即可进行下一批次的500个pod的创建。
rsc.manageReplicas:如果 rsc.expectations.SatisfiedExpectations(key) && rs.DeletionTimestamp == nil,检查下是否还需要创建/删除 Pod
diff = len(filteredPods) - rs.Spec.Replicas
- 如果 diff < 0,说明还需要创建 Pod
- creationCount = min(-diff, rsc.burstReplicas)
- rsc.expectations.ExpectCreations(rsKey, creationCount)
- slowStartBatch:批量创建 Pod,返回创建成功的 Pod 数量 successfulCreations
- skippedPods = creationCount - successfulCreations:需要从 expectations 减掉没有创建的 Pod 数量 rsc.expectations.CreationObserved(rsKey)
- 如果 diff > 0,说明还需要删除 Pod
- deletionCount = min(diff, rsc.burstReplicas)
- podsToDelete = getPodsToDelete(filteredPods, deletionCount):尽量删除不健康的 Pod
- rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
- 遍历 podsToDelete 逐个删除,并且更新 expectations:rsc.expectations.DeletionObserved(rsKey, podKey)
- calculateStatus:计算新的 rs.Status
- Replicas:len(filteredPods)
- FullyLabeledReplicas:pod.Labels 匹配 rs.Spec.Template.Labels 的 Pod 数量
- ReadyReplicas:GetPodCondition(pod.Status, v1.PodReady).Status == v1.ConditionTrue 的 Pod 数量
- AvailableReplicas:处于 Ready 状态,并且持续 rs.Spec.MinReadySeconds 时长的 Pod 数量
-rsc.queue.AddAfter:延迟 updatedRS.Spec.MinReadySeconds 秒后把 rs 重新入队
核心创建删除pod方法,主要是根据replicaset所期望的pod数量与现存pod数量做比较,然后根据比较结果来创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等,需要特别注意的是,每一次调用rsc.manageReplicas方法,创建/删除pod的个数上限为500。
在replicaset对象的调谐中,rsc.manageReplicas方法不一定每一次都会调用执行,只有当rsc.expectations.SatisfiedExpectations方法返回true,且replicaset对象的DeletionTimestamp属性为空时,才会进行rsc.manageReplicas方法的调用。
diff = 现存pod数量 - 期望的pod数量
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
(1)当现存pod数量比期望的少时,需要创建pod,进入创建pod的逻辑代码块。
(2)当现存pod数量比期望的多时,需要删除pod,进入删除pod的逻辑代码块。
一次同步操作中批量创建或删除pod的个数上限为rsc.burstReplicas,即500个。
// pkg/controller/replicaset/replica_set.go
const (
// Realistic value of the burstReplica field for the replica set manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// The number of times we retry updating a ReplicaSet's status.
statusUpdateRetries = 1
)
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations
设置replicaset对象的expectations。
// pkg/controller/controller_utils.go
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
return r.SetExpectations(controllerKey, adds, 0)
}
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
klog.V(4).Infof("Setting expectations %#v", exp)
return r.Add(exp)
}
rsc.expectations.CreationObserved
将replicaset对象expectations中期望创建的pod数量减1.
// pkg/controller/controller_utils.go
// CreationObserved atomically decrements the `add` expectation count of the given controller.
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
r.LowerExpectations(controllerKey, 1, 0)
}
// Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(-add), int64(-del))
// The expectations might've been modified since the update on the previous line.
klog.V(4).Infof("Lowered expectations %#v", exp)
}
}
那正常情况下(即没有pod创建异常)Expectations在什么时候会更新为(key,add:0,del:0)呢,继续看下面的分析。
pod add event handlerFunc-addPod
replicaset controller会监听pod的新增事件,每成功创建出一个pod,会调用addPod方法。在addPod方法中,同样会调用一次rsc.expectations.CreationObserved,将Expectations中期望创建的pod数量减1。
// pkg/controller/replicaset/replica_set.go
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
...
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey)
rsc.queue.Add(rsKey)
return
}
...
}
rsc.manageReplicas
核心创建删除pod方法,主要是根据replicaset所期望的pod数量与现存pod数量做比较,然后根据比较结果来创建/删除pod,最终使得replicaset对象所期望的pod数量与现存pod数量相等,需要特别注意的是,每一次调用rsc.manageReplicas方法,创建/删除pod的个数上限为500。
在replicaset对象的调谐中,rsc.manageReplicas方法不一定每一次都会调用执行,只有当rsc.expectations.SatisfiedExpectations方法返回true,且replicaset对象的DeletionTimestamp属性为空时,才会进行rsc.manageReplicas方法的调用。
diff = 现存pod数量 - 期望的pod数量
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
(1)当现存pod数量比期望的少时,需要创建pod,进入创建pod的逻辑代码块。
(2)当现存pod数量比期望的多时,需要删除pod,进入删除pod的逻辑代码块。
一次同步操作中批量创建或删除pod的个数上限为rsc.burstReplicas,即500个。
// pkg/controller/replicaset/replica_set.go
const (
// Realistic value of the burstReplica field for the replica set manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
// The number of times we retry updating a ReplicaSet's status.
statusUpdateRetries = 1
)
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
接下来分析一下创建/删除pod的逻辑代码块。
创建pod逻辑代码块
主要逻辑:
(1)运算获取需要创建的pod数量,并设置数量上限500;
(2)调用rsc.expectations.ExpectCreations,将本轮调谐期望创建的pod数量设置进expectations;
(3)调用slowStartBatch函数来对pod进行创建逻辑处理;
(4)调用slowStartBatch函数完成后,计算获取创建失败的pod的数量,然后调用相应次数的rsc.expectations.CreationObserved方法,减去本轮调谐中期望创建的pod数量。
为什么要减呢?因为expectations记录了replicaset对象在某一次调谐中期望创建/删除的pod数量,pod创建/删除完成后,replicaset controller会watch到pod的创建/删除事件,从而调用rsc.expectations.CreationObserved方法来使期望创建/删除的pod数量减少。当有相应数量的pod创建/删除失败后,replicaset controller是不会watch到相应的pod创建/删除事件的,所以必须把本轮调谐期望创建/删除的pod数量做相应的减法,否则本轮调谐中的期望创建/删除pod数量永远不可能小于等于0,这样的话,rsc.expectations.SatisfiedExpectations方法就只会等待expectations超时期限到达才会返回true了。
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: rsc.GroupVersion().String(),
Kind: rsc.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
}
return err
})
if skippedPods := diff - successfulCreations; skippedPods > 0 {
glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
return err
slowStartBatch
来看到slowStartBatch,可以看到创建pod的算法为:
(1)每次批量创建的 pod 数依次为 1、2、4、8......,呈指数级增长,起与要创建的pod数量相同的goroutine来负责创建pod。
(2)创建pod按1、2、4、8...的递增趋势分多批次进行,若某批次创建pod有失败的(如apiserver限流,丢弃请求等,注意:超时除外,因为initialization处理有可能超时),则后续批次不再进行,结束本次函数调用。
// pkg/controller/replicaset/replica_set.go
// slowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}
rsc.podControl.CreatePodsWithControllerRef
前面定义的创建pod时调用的方法为rsc.podControl.CreatePodsWithControllerRef。
func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods("", namespace, template, controllerObject, controllerRef)
}
func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
if err != nil {
return err
}
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
}
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
if err != nil {
// only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
}
return err
}
accessor, err := meta.Accessor(object)
if err != nil {
klog.Errorf("parentObject does not have ObjectMeta, %v", err)
return nil
}
klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)
return nil
}
删除逻辑代码块
主要逻辑:
(1)运算获取需要删除的pod数量,并设置数量上限500;
(2)根据要缩容删除的pod数量,先调用getPodsToDelete函数找出需要删除的pod列表;
(3)调用rsc.expectations.ExpectCreations,将本轮调谐期望删除的pod数量设置进expectations;
(4)每个pod拉起一个goroutine,调用rsc.podControl.DeletePod来删除该pod;
(5)对于删除失败的pod,会调用rsc.expectations.DeletionObserved方法,减去本轮调谐中期望创建的pod数量。
至于为什么要减,原因跟上面创建逻辑代码块中分析的一样。
(6)等待所有gorouutine完成,return返回。
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, diff)
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
getPodsToDelete
getPodsToDelete:根据要缩容删除的pod数量,然后返回需要删除的pod列表。
// pkg/controller/replicaset/replica_set.go
func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
// No need to sort pods if we are about to delete all of them.
// diff will always be <= len(filteredPods), so not need to handle > case.
if diff < len(filteredPods) {
podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
sort.Sort(podsWithRanks)
}
return filteredPods[:diff]
}
func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
podsOnNode := make(map[string]int)
for _, pod := range relatedPods {
if controller.IsPodActive(pod) {
podsOnNode[pod.Spec.NodeName]++
}
}
ranks := make([]int, len(podsToRank))
for i, pod := range podsToRank {
ranks[i] = podsOnNode[pod.Spec.NodeName]
}
return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks}
}
筛选要删除的pod逻辑
按照下面的排序规则,从上到下进行排序,各个条件相互互斥,符合其中一个条件则排序完成:
(1)优先删除没有绑定node的pod;
(2)优先删除处于Pending状态的pod,然后是Unknown,最后才是Running;
(3)优先删除Not ready的pod,然后才是ready的pod;
(4)按同node上所属replicaset的pod数量排序,优先删除所属replicaset的pod数量多的node上的pod;
(5)按pod ready的时间排序,优先删除ready时间最短的pod;
(6)优先删除pod中容器重启次数较多的pod;
(7)按pod创建时间排序,优先删除创建时间最短的pod。
// pkg/controller/controller_utils.go
func (s ActivePodsWithRanks) Less(i, j int) bool {
// 1. Unassigned < assigned
// If only one of the pods is unassigned, the unassigned one is smaller
if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) {
return len(s.Pods[i].Spec.NodeName) == 0
}
// 2. PodPending < PodUnknown < PodRunning
if podPhaseToOrdinal[s.Pods[i].Status.Phase] != podPhaseToOrdinal[s.Pods[j].Status.Phase] {
return podPhaseToOrdinal[s.Pods[i].Status.Phase] < podPhaseToOrdinal[s.Pods[j].Status.Phase]
}
// 3. Not ready < ready
// If only one of the pods is not ready, the not ready one is smaller
if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) {
return !podutil.IsPodReady(s.Pods[i])
}
// 4. Doubled up < not doubled up
// If one of the two pods is on the same node as one or more additional
// ready pods that belong to the same replicaset, whichever pod has more
// colocated ready pods is less
if s.Rank[i] != s.Rank[j] {
return s.Rank[i] > s.Rank[j]
}
// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
// see https://github.com/kubernetes/kubernetes/issues/22065
// 5. Been ready for empty time < less time < more time
// If both pods are ready, the latest ready one is smaller
if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) {
readyTime1 := podReadyTime(s.Pods[i])
readyTime2 := podReadyTime(s.Pods[j])
if !readyTime1.Equal(readyTime2) {
return afterOrZero(readyTime1, readyTime2)
}
}
// 6. Pods with containers with higher restart counts < lower restart counts
if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) {
return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j])
}
// 7. Empty creation time pods < newer pods < older pods
if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) {
return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp)
}
return false
}
rsc.podControl.DeletePod
删除pod的方法。
// pkg/controller/controller_utils.go
func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
accessor, err := meta.Accessor(object)
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
klog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil && !apierrors.IsNotFound(err) {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
return fmt.Errorf("unable to delete pods: %v", err)
}
r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)
return nil
}
calculateStatus
calculateStatus函数计算并返回replicaset对象的status。
怎么计算status呢?
(1)根据现存pod数量、Ready状态的pod数量、availabel状态的pod数量等,给replicaset对象的status的Replicas、ReadyReplicas、AvailableReplicas等字段赋值;
(2)根据replicaset对象现有status中的condition配置以及前面调用rsc.manageReplicas方法后是否有错误,来决定给status新增condition或移除condition,conditionType为ReplicaFailure。
当调用rsc.manageReplicas方法出错,且replicaset对象的status中,没有conditionType为ReplicaFailure的condition,则新增conditionType为ReplicaFailure的condition,表示该replicaset创建/删除pod出错;
当调用rsc.manageReplicas方法没有任何错误,且replicaset对象的status中,有conditionType为ReplicaFailure的condition,则去除该condition,表示该replicaset创建/删除pod成功。
func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
newStatus := rs.Status
// Count the number of pods that have labels matching the labels of the pod
// template of the replica set, the matching pods may have more
// labels than are in the template. Because the label of podTemplateSpec is
// a superset of the selector of the replica set, so the possible
// matching pods must be part of the filteredPods.
fullyLabeledReplicasCount := 0
readyReplicasCount := 0
availableReplicasCount := 0
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
for _, pod := range filteredPods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
fullyLabeledReplicasCount++
}
if podutil.IsPodReady(pod) {
readyReplicasCount++
if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
availableReplicasCount++
}
}
}
failureCond := GetCondition(rs.Status, apps.ReplicaSetReplicaFailure)
if manageReplicasErr != nil && failureCond == nil {
var reason string
if diff := len(filteredPods) - int(*(rs.Spec.Replicas)); diff < 0 {
reason = "FailedCreate"
} else if diff > 0 {
reason = "FailedDelete"
}
cond := NewReplicaSetCondition(apps.ReplicaSetReplicaFailure, v1.ConditionTrue, reason, manageReplicasErr.Error())
SetCondition(&newStatus, cond)
} else if manageReplicasErr == nil && failureCond != nil {
RemoveCondition(&newStatus, apps.ReplicaSetReplicaFailure)
}
newStatus.Replicas = int32(len(filteredPods))
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
newStatus.ReadyReplicas = int32(readyReplicasCount)
newStatus.AvailableReplicas = int32(availableReplicasCount)
return newStatus
}
updateReplicaSetStatus
主要逻辑:
(1)判断新计算出来的status中的各个属性如Replicas、ReadyReplicas、AvailableReplicas以及Conditions是否与现存replicaset对象的status中的一致,一致则不用做更新操作,直接return;
(2)调用c.UpdateStatus更新replicaset的status。
// pkg/controller/replicaset/replica_set_utils.go
func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if rs.Status.Replicas == newStatus.Replicas &&
rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rs.Generation == rs.Status.ObservedGeneration &&
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
return rs, nil
}
// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
// TODO: This can clobber an update if we allow multiple agents to write to the
// same status.
newStatus.ObservedGeneration = rs.Generation
var getErr, updateErr error
var updatedRS *apps.ReplicaSet
for i, rs := 0, rs; ; i++ {
klog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))
rs.Status = newStatus
updatedRS, updateErr = c.UpdateStatus(rs)
if updateErr == nil {
return updatedRS, nil
}
// Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit.
if i >= statusUpdateRetries {
break
}
// Update the ReplicaSet with the latest resource version for the next poll
if rs, getErr = c.Get(rs.Name, metav1.GetOptions{}); getErr != nil {
// If the GET fails we can't trust status.Replicas anymore. This error
// is bound to be more interesting than the update failure.
return nil, getErr
}
}
return nil, updateErr
}
c.UpdateStatus
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/replicaset.go
func (c *replicaSets) UpdateStatus(replicaSet *v1.ReplicaSet) (result *v1.ReplicaSet, err error) {
result = &v1.ReplicaSet{}
err = c.client.Put().
Namespace(c.ns).
Resource("replicasets").
Name(replicaSet.Name).
SubResource("status").
Body(replicaSet).
Do().
Into(result)
return
}
总结
replicaset controller核心处理逻辑
replicaset controller的核心处理逻辑是根据replicaset对象里期望的pod数量以及现存pod数量的比较,当期望pod数量比现存pod数量多时,调用创建pod算法创建出新的pod,直至达到期望数量;当期望pod数量比现存pod数量少时,调用删除pod算法,并根据一定的策略对现存pod列表做排序,从中按顺序选择多余的pod然后删除,直至达到期望数量。
replicaset controller创建pod算法
replicaset controller创建pod的算法是,按1、2、4、8...的递增趋势分多批次进行(每次调谐中创建pod的数量上限为500个,超过上限的会在下次调谐中再创建),若某批次创建pod有失败的(如apiserver限流,丢弃请求等,注意:超时除外,因为initialization处理有可能超时),则后续批次的pod创建不再进行,需等待该repliaset对象下次调谐时再触发该pod创建算法,进行pod的创建,直至达到期望数量。
replicaset controller删除pod算法
replicaset controller删除pod的算法是,先根据一定的策略将现存pod列表做排序,然后按顺序从中选择指定数量的pod,拉起与要删除的pod数量相同的goroutine来删除pod(每次调谐中删除pod的数量上限为500个),并等待所有goroutine执行完成。删除pod有失败的(如apiserver限流,丢弃请求)或超过500上限的部分,需等待该repliaset对象下次调谐时再触发该pod删除算法,进行pod的删除,直至达到期望数量。
筛选要删除的pod逻辑
按照下面的排序规则,从上到下进行排序,各个条件相互互斥,符合其中一个条件则排序完成:
(1)优先删除没有绑定node的pod;
(2)优先删除处于Pending状态的pod,然后是Unknown,最后才是Running;
(3)优先删除Not ready的pod,然后才是ready的pod;
(4)按同node上所属replicaset的pod数量排序,优先删除所属replicaset的pod数量多的node上的pod;
(5)按pod ready的时间排序,优先删除ready时间最短的pod;
(6)优先删除pod中容器重启次数较多的pod;
(7)按pod创建时间排序,优先删除创建时间最短的pod
expectations机制作用总结
expectations的过期时间机制解决了某一批次创建/删除pod因某些原因一直卡住不能完成而导致的replicaset期望副本数永远达不到预期的问题。
expectations.SatisfiedExpectations返回true,则进入核心处理方法rsc.manageReplicas,根据replicaset所期望的pod数量与现存pod数量做比较,判断是否需要进行下一批次的创建/删除pod的任务。
综上可以看出,expectations主要用于控制让多个创建/删除pod批次串行执行,不让其并行执行,防止了并发执行所可能产生的重复删除pod、创建出replicaset所期望的pod数量以外的多余的pod等问题(当replicaset对象的某一创建/删除pod的批次还在进行中,这时再次进行pod的创建删除操作,如果没有expectations的判断控制,就会再次进行pod的批量创建/删除时,从而导致该问题的发生)。