k8s之replicaset controller源码简单分析

简介

replicaset controller监听apiserver获取新的replicaset,然后进行监控
检查

diff := len(filteredPods) - int(*(rs.Spec.Replicas))

进行,增加pod或者删除pod

创建replicaset处

pkg/registry/apps/replicaset/storage/storage.go中

func NewStorage(optsGetter generic.RESTOptionsGetter) (ReplicaSetStorage, error) {
    replicaSetRest, replicaSetStatusRest, err := NewREST(optsGetter)
    if err != nil {
        return ReplicaSetStorage{}, err
    }

    return ReplicaSetStorage{
        ReplicaSet: replicaSetRest,
        Status:     replicaSetStatusRest,
        Scale:      &ScaleREST{store: replicaSetRest.Store},
    }, nil
}

func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &apps.ReplicaSet{} },
        NewListFunc:              func() runtime.Object { return &apps.ReplicaSetList{} },
        PredicateFunc:            replicaset.MatchReplicaSet,
        DefaultQualifiedResource: apps.Resource("replicasets"),

        CreateStrategy: replicaset.Strategy,
        UpdateStrategy: replicaset.Strategy,
        DeleteStrategy: replicaset.Strategy,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: replicaset.GetAttrs}
    if err := store.CompleteWithOptions(options); err != nil {
        return nil, nil, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = replicaset.StatusStrategy

    return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, nil
}

controller

ccmd/kube-controller-manager/app/apps.go中

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
        return nil, false, nil
    }
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

pkg/controller/replicaset/replica_set.go中

func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
        apps.SchemeGroupVersion.WithKind("ReplicaSet"),
        "replicaset_controller",
        "replicaset",
        controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
        },
    )
}

func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    rsc := &ReplicaSetController{
        GroupVersionKind: gvk,
        kubeClient:       kubeClient,
        podControl:       podControl,
        burstReplicas:    burstReplicas,
        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
    }

    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.addRS,
        UpdateFunc: rsc.updateRS,
        DeleteFunc: rsc.deleteRS,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
        // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
        // local storage, so it should be ok.
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced

    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    klog.Infof("Starting %v controller", controllerName)
    defer klog.Infof("Shutting down %v controller", controllerName)

    if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }

    <-stopCh
}

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
        return nil
    }
    if diff < 0 {
        diff *= -1
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        // TODO: Track UIDs of creates just like deletes. The problem currently
        // is we'd need to wait on the result of a create to record the pod's
        // UID, which would require locking *across* the create, which will turn
        // into a performance bottleneck. We should generate a UID for the pod
        // beforehand and store it via ExpectCreations.
        rsc.expectations.ExpectCreations(rsKey, diff)
        klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
        // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
        // and double with each successful iteration in a kind of "slow start".
        // This handles attempts to start large numbers of pods that would
        // likely all fail with the same error. For example a project with a
        // low quota that attempts to create a large number of pods will be
        // prevented from spamming the API service with the pod create requests
        // after one of its pods fails.  Conveniently, this also prevents the
        // event spam that those failures would generate.
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil {
                if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                    // if the namespace is being terminated, we don't have to do
                    // anything because any creation will fail
                    return nil
                }
            }
            return err
        })

        // Any skipped pods that we never attempted to start shouldn't be expected.
        // The skipped pods will be retried later. The next controller resync will
        // retry the slow start process.
        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
            for i := 0; i < skippedPods; i++ {
                // Decrement the expected number of creates because the informer won't observe this pod
                rsc.expectations.CreationObserved(rsKey)
            }
        }
        return err
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)

        relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
        utilruntime.HandleError(err)

        // Choose which Pods to delete, preferring those in earlier phases of startup.
        podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

        // Snapshot the UIDs (ns/name) of the pods we're expecting to see
        // deleted, so we know to record their expectations exactly once either
        // when we see it as an update of the deletion timestamp, or as a delete.
        // Note that if the labels on a pod/rs change in a way that the pod gets
        // orphaned, the rs will only wake up after the expectations have
        // expired even if other pods are deleted.
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(targetPod)
                    rsc.expectations.DeletionObserved(rsKey, podKey)
                    if !apierrors.IsNotFound(err) {
                        klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
                        errCh <- err
                    }
                }
            }(pod)
        }
        wg.Wait()

        select {
        case err := <-errCh:
            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
            if err != nil {
                return err
            }
        default:
        }
    }

    return nil
}

func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
    key, quit := rsc.queue.Get()
    if quit {
        return false
    }
    defer rsc.queue.Done(key)

    err := rsc.syncHandler(key.(string))
    if err == nil {
        rsc.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)

    return true
}

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
        rsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return err
    }

    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
        return nil
    }

    // list all pods to include the pods that don't match the rs`s selector
    // anymore but has the stale controller ref.
    // TODO: Do the List and Filter in a single pass, or use an index.
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
        return err
    }
    // Ignore inactive pods.
    filteredPods := controller.FilterActivePods(allPods)

    // NOTE: filteredPods are pointing to objects from cache - if you need to
    // modify them, you need to copy it first.
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    if err != nil {
        return err
    }

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

    // Always updates status as pods come up or die.
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        // Multiple things could lead to this update failing. Requeuing the replica set ensures
        // Returning an error causes a requeue without forcing a hotloop
        return err
    }
    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
        rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}

pkg/controller/replicaset/replica_set_utils.go中

func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
    // This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
    // we do a periodic relist every 30s. If the generations differ but the replicas are
    // the same, a caller might've resized to the same replica count.
    if rs.Status.Replicas == newStatus.Replicas &&
        rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
        rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
        rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
        rs.Generation == rs.Status.ObservedGeneration &&
        reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
        return rs, nil
    }

    // Save the generation number we acted on, otherwise we might wrongfully indicate
    // that we've seen a spec update when we retry.
    // TODO: This can clobber an update if we allow multiple agents to write to the
    // same status.
    newStatus.ObservedGeneration = rs.Generation

    var getErr, updateErr error
    var updatedRS *apps.ReplicaSet
    for i, rs := 0, rs; ; i++ {
        klog.V(4).Infof(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
            fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
            fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
            fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
            fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
            fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))

        rs.Status = newStatus
        updatedRS, updateErr = c.UpdateStatus(context.TODO(), rs, metav1.UpdateOptions{})
        if updateErr == nil {
            return updatedRS, nil
        }
        // Stop retrying if we exceed statusUpdateRetries - the replicaSet will be requeued with a rate limit.
        if i >= statusUpdateRetries {
            break
        }
        // Update the ReplicaSet with the latest resource version for the next poll
        if rs, getErr = c.Get(context.TODO(), rs.Name, metav1.GetOptions{}); getErr != nil {
            // If the GET fails we can't trust status.Replicas anymore. This error
            // is bound to be more interesting than the update failure.
            return nil, getErr
        }
    }

    return nil, updateErr
}

k8s.io/client-go/deprecated/typed/apps/v1beta2/replicaset.go中

func (c *replicaSets) UpdateStatus(ctx context.Context, replicaSet *v1.ReplicaSet, opts metav1.UpdateOptions) (result *v1.ReplicaSet, err error) {
    result = &v1beta2.ReplicaSet{}
    err = c.client.Put().
        Namespace(c.ns).
        Resource("replicasets").
        Name(replicaSet.Name).
        SubResource("status").
        Body(replicaSet).
        Do(context.TODO()).
        Into(result)
    return
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352