接上文 [k8s源码分析][controller-manager] ReplicaSetController(ReplicaSet)分析(1)
4.3 消费者
分析完了生产者, 接下来看一下消费者是如何处理队列中的
key
的.
syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
// 解析key 得到replcaset的namespace和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 本地缓存
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
// 如果该replicaset不在了 直接删除
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
// 判断是否需要去计算replicas数量与当前实际存在的数量是否相等
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
// 转化成selector类型
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
return nil
}
// 取出该namespace下所有的pods
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
// 过滤出那些没有active的pods
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
// 过滤出所有属于该replicaset的pods存到了filteredPods中
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 计算从而达到replica的要求
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
syncReplicaSet
内容较多, 简单的就滤过.
1. 解析key
, 得到replcaset
的namespace
和name
.
2. 从本地缓存中根据namespace
和name
获得需要处理的replicaset
. 如果已经不存在, 则从expectations
中删除对应的key
.
3. 根据SatisfiedExpectations
方法判断是否需要进行同步.
4. 找到属于该replicaset
的pods
, 并存到filteredPods
中.
5. 如果需要同步, 调用manageReplicas
来同步replicaset.replicas
数量是否匹配.
6. 更新replicaset
的状态.
claimPods
func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
// 再次确认一下replicaset是否有变化
if fresh.UID != rs.UID {
return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
return cm.ClaimPods(filteredPods)
}
1. 实现定义了
canAdoptFunc
方法, 然后根据可以操作pod
的podControl
和selector
规则等等参数生成一个PodControllerRefManager
对象.
2. 调用PodControllerRefManager
对象的ClaimPod
方法为该rs
过滤出属于它的pod
.
关于
PodControllerRefManager
已经在 [k8s源码分析][controller-manager] controller_ref_manager分析 中已经详细介绍过了.
manageReplicas
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// 查看当前已经拥有的pod与replicas之间的差值 看看是生成多了还是少了
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
if diff < 0 {
// 表明生成的pod还不够
diff *= -1
if diff > rsc.burstReplicas {
// 一次最多生成这么多
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
// 批量生成
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: rsc.GroupVersion().String(),
Kind: rsc.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
return nil
}
return err
})
// 还有skippedPods个生成失败的
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
rsc.expectations.CreationObserved(rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
podsToDelete := getPodsToDelete(filteredPods, diff)
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
}(pod)
}
wg.Wait()
select {
case err := <-errCh:
if err != nil {
return err
}
default:
}
}
return nil
}
该方法主要作用是:
1. 计算出目前该replicaset
已经拥有的pods
与replicas
数量之间的差值.
2. 如果还没有到达replicas
, 则继续生成diff
个pods
.
3. 如果超过了replicas
, 则删除diff
个pods
.
4. 关于expectations
在后面统一分析.
calculateStatus
根据
filteredPods
计算当前replicaset
的状态.
4.4 关于expectations的理解
一般
expectations
要么就是期望增加, 要么就是期望减少. 一般不会说既期望增加多少, 同时又希望减少多少.
以期望增加为例
1. 在
manageReplicas
方法中设置了期望值. 此时diff < 0
, 期望增加diff
个pod
. 所以调用了rsc.expectations.ExpectCreations(rsKey, diff)
设置了期望增加的数量.
2. 设置完了之后就利用podControl
生成pod
的操作, 此时又两种情况:2.1 假设
podController
调用生成pod
的操作没有错误, 此时就会进入到podInformer
中的AddPod
方法中, 而AddPod
中检查到pod
有owner
的时候, 会调用rsc.expectations.CreationObserved(rsKey)
, 此时就会在对应的期望中减少一个, 因为已经成功了.
2.2 假设podController
调用生成pod
的操作有错误, 那在manageReplicas
方法中可以看到会把这些没有成功调用的每个都调用rsc.expectations.CreationObserved(rsKey)
减少一个. (因为这个时候不减, 没有别的地方可以减的了, 因为podInformer
无法监控到).
3. 理解了2之后, 可以看到syncReplicaSet
方法中调用SatisfiedExpectations
来判断是否需要调用manageReplicas
方法, 说白一点就是等待manageReplicas
一轮做完, 什么叫一轮呢?所以让它生成的pod
全部进入了podInformer
的AddPod
方法, 就叫一轮. 可以看看SatisfiedExpectations
的具体实现.
func (e *ControlleeExpectations) Fulfilled() bool {
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
// 是否满足
if exp.Fulfilled() {
klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
return true
} else if exp.isExpired() {
// 已经过期了 等待了太长时间
klog.V(4).Infof("Controller expectations expired %#v", exp)
return true
} else {
// 此时并不是所有的pod都已经进入到了podInformer的AddPod方法中
klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
return false
}
} else if err != nil {
klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
} else {
klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
}
// 不存在当然去同步 或者出现错误
return true
}
相信理解了上面说的, 理解
SatisfiedExpectations
方法就不是很难了.
5. 例子的问题
在本文开始的例子中先创建的
pod
由于是一个孤儿pod
, 因此在claimPods
方法中由于match
到了后面创建的replicaset
, 所以该replicaset
就会尝试接收这个pod
, 并且向api-server
发送patch
请求将ownerReference
这个字段更新到该pod
(test
)上.
与
test
这个pod
不同的是,replicatest-rdjv6
这个pod
在创建的时候就已经有ownerRerference
字段了. 在manageReplicas
方法中可以看到直接加入了OwnerReference
字段.
6. 留一个问题
创建一个
replicaset
, 然后突然kube-controller-manager
由于某种原因挂了, 此时删除这个replicaset
, 然后观察会发生什么, 之后再恢复kube-controller-manager
, 再看看会发生什么
[root@master kubectl]# ./kubectl apply -f replicaset.yaml
replicaset.apps/replicatest created
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 1/1 Running 0 4s
replicatest-rdc8c 1/1 Running 0 4s
[root@master kubectl]# ./kubectl get rs
NAME DESIRED CURRENT READY AGE
replicatest 2 2 2 7s
此时关闭
kube-controller-manager
组件, 删除了replicaset
, 但是对应的pods
并没有删除.
[root@master kubectl]# ./kubectl delete rs replicatest
replicaset.extensions "replicatest" deleted
[root@master kubectl]# ./kubectl get rs
No resources found.
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 1/1 Running 0 38s
replicatest-rdc8c 1/1 Running 0 38s
[root@master kubectl]#
此时重新打开
kube-controller-manager
组件, 可以看到这两个pods
被删除了.
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 1/1 Running 0 2m48s
replicatest-rdc8c 1/1 Running 0 2m48s
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
replicatest-9q9sm 0/1 Terminating 0 2m49s
replicatest-rdc8c 0/1 Terminating 0 2m49s
[root@master kubectl]# ./kubectl get pods
No resources found.
[root@master kubectl]#
打开
kube-controller-manager
后删除pods
的操作是由garbagecollector
这个controller
来做的.
7. 总结
1. 每当有
replicaset
的增加/更新/删除都会将replicaset
进入队列来进行判断是否需要重新同步.
2. 每当有pods
的增加/更新/删除也会根据情况找到相应的replicaset
进入队列来进行判断是否需要重新同步.
3. 同步replicas
的过程其实就是通过增加或者删除pods
保持当前该replicaset
拥有的pods
的个数正好等于replicas
. 但是是否需要同步是通过该replicaset
的expectations
来控制的.