k8s 之 daemonset controller 源码简单分析

简介

daemonset controller 监控 damonset,pod 以及 node 的变化,然后进行相应的 pod 创建或者删除
然后 kubelet 监控到pod改动,进行实际的创建或者删除

源码

cmd/kube-controller-manager/app/apps.go 中

func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
        return nil, false, nil
    }
    dsc, err := daemon.NewDaemonSetsController(
        ctx.InformerFactory.Apps().V1().DaemonSets(),
        ctx.InformerFactory.Apps().V1().ControllerRevisions(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Core().V1().Nodes(),
        ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
        flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
    }
    go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
    return nil, true, nil
}

pkg/controller/daemon/daemon_controller.go 中

func NewDaemonSetsController(
    daemonSetInformer appsinformers.DaemonSetInformer,
    historyInformer appsinformers.ControllerRevisionInformer,
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    kubeClient clientset.Interface,
    failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dsc := &DaemonSetsController{
        kubeClient:    kubeClient,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        },
        crControl: controller.RealControllerRevisionControl{
            KubeClient: kubeClient,
        },
        burstReplicas: BurstReplicas,
        expectations:  controller.NewControllerExpectations(),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
    }

    daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addDaemonset,
        UpdateFunc: dsc.updateDaemonset,
        DeleteFunc: dsc.deleteDaemonset,
    })
    dsc.dsLister = daemonSetInformer.Lister()
    dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced

    historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addHistory,
        UpdateFunc: dsc.updateHistory,
        DeleteFunc: dsc.deleteHistory,
    })
    dsc.historyLister = historyInformer.Lister()
    dsc.historyStoreSynced = historyInformer.Informer().HasSynced

    // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
    // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addPod,
        UpdateFunc: dsc.updatePod,
        DeleteFunc: dsc.deletePod,
    })
    dsc.podLister = podInformer.Lister()

    // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
    podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
        "nodeName": indexByPodNodeName,
    })
    dsc.podNodeIndex = podInformer.Informer().GetIndexer()
    dsc.podStoreSynced = podInformer.Informer().HasSynced

    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addNode,
        UpdateFunc: dsc.updateNode,
    },
    )
    dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
    dsc.nodeLister = nodeInformer.Lister()

    dsc.syncHandler = dsc.syncDaemonSet
    dsc.enqueueDaemonSet = dsc.enqueue

    dsc.failedPodsBackoff = failedPodsBackoff

    return dsc, nil
}

func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dsc.queue.ShutDown()

    klog.Infof("Starting daemon sets controller")
    defer klog.Infof("Shutting down daemon sets controller")

    if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(dsc.runWorker, time.Second, stopCh)
    }

    go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)

    <-stopCh
}

func (dsc *DaemonSetsController) runWorker() {
    for dsc.processNextWorkItem() {
    }
}

// processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
func (dsc *DaemonSetsController) processNextWorkItem() bool {
    dsKey, quit := dsc.queue.Get()
    if quit {
        return false
    }
    defer dsc.queue.Done(dsKey)

    err := dsc.syncHandler(dsKey.(string))
    if err == nil {
        dsc.queue.Forget(dsKey)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
    dsc.queue.AddRateLimited(dsKey)

    return true
}


func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(3).Infof("daemon set has been deleted %v", key)
        dsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
    }

    nodeList, err := dsc.nodeLister.List(labels.Everything())
    if err != nil {
        return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
    }

    everything := metav1.LabelSelector{}
    if reflect.DeepEqual(ds.Spec.Selector, &everything) {
        dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
        return nil
    }

    // Don't process a daemon set until all its creations and deletions have been processed.
    // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
    // then we do not want to call manage on foo until the daemon pods have been created.
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    // If the DaemonSet is being deleted (either by foreground deletion or
    // orphan deletion), we cannot be sure if the DaemonSet history objects
    // it owned still exist -- those history objects can either be deleted
    // or orphaned. Garbage collector doesn't guarantee that it will delete
    // DaemonSet pods before deleting DaemonSet history objects, because
    // DaemonSet history doesn't own DaemonSet pods. We cannot reliably
    // calculate the status of a DaemonSet being deleted. Therefore, return
    // here without updating status for the DaemonSet being deleted.
    if ds.DeletionTimestamp != nil {
        return nil
    }

    // Construct histories of the DaemonSet, and get the hash of current history
    cur, old, err := dsc.constructHistory(ds)
    if err != nil {
        return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
    }
    hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

    if !dsc.expectations.SatisfiedExpectations(dsKey) {
        // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
        return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
    }

    err = dsc.manage(ds, nodeList, hash)
    if err != nil {
        return err
    }

    // Process rolling updates if we're ready.
    if dsc.expectations.SatisfiedExpectations(dsKey) {
        switch ds.Spec.UpdateStrategy.Type {
        case apps.OnDeleteDaemonSetStrategyType:
        case apps.RollingUpdateDaemonSetStrategyType:
            err = dsc.rollingUpdate(ds, nodeList, hash)
        }
        if err != nil {
            return err
        }
    }

    err = dsc.cleanupHistory(ds, old)
    if err != nil {
        return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
    }

    return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}

func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // Find out the pods which are created for the nodes by DaemonSet.
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
    // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
    var nodesNeedingDaemonPods, podsToDelete []string
    for _, node := range nodeList {
        nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
            node, nodeToDaemonPods, ds)

        if err != nil {
            continue
        }

        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
        podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
    // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
    podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

    // Label new pods using the hash label value of the current history when creating them
    if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
        return err
    }

    return nil
}

func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
    // We need to set expectations before creating/deleting pods to avoid race conditions.
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    createDiff := len(nodesNeedingDaemonPods)
    deleteDiff := len(podsToDelete)

    if createDiff > dsc.burstReplicas {
        createDiff = dsc.burstReplicas
    }
    if deleteDiff > dsc.burstReplicas {
        deleteDiff = dsc.burstReplicas
    }

    dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

    // error channel to communicate back failures.  make the buffer big enough to avoid any blocking
    errCh := make(chan error, createDiff+deleteDiff)

    klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
    createWait := sync.WaitGroup{}
    // If the returned error is not nil we have a parse error.
    // The controller handles this via the hash.
    generation, err := util.GetTemplateGeneration(ds)
    if err != nil {
        generation = nil
    }
    template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails.  Conveniently, this also prevents the
    // event spam that those failures would generate.
    batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
    for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
        errorCount := len(errCh)
        createWait.Add(batchSize)
        for i := pos; i < pos+batchSize; i++ {
            go func(ix int) {
                defer createWait.Done()

                podTemplate := template.DeepCopy()
                // The pod's NodeAffinity will be updated to make sure the Pod is bound
                // to the target node by default scheduler. It is safe to do so because there
                // should be no conflicting node affinity with the target node.
                podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
                    podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

                err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
                    ds, metav1.NewControllerRef(ds, controllerKind))

                if err != nil {
                    if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                        // If the namespace is being torn down, we can safely ignore
                        // this error since all subsequent creations will fail.
                        return
                    }
                }
                if err != nil {
                    klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
                    dsc.expectations.CreationObserved(dsKey)
                    errCh <- err
                    utilruntime.HandleError(err)
                }
            }(i)
        }
        createWait.Wait()
        // any skipped pods that we never attempted to start shouldn't be expected.
        skippedPods := createDiff - (batchSize + pos)
        if errorCount < len(errCh) && skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
            dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
            // The skipped pods will be retried later. The next controller resync will
            // retry the slow start process.
            break
        }
    }

    klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
    deleteWait := sync.WaitGroup{}
    deleteWait.Add(deleteDiff)
    for i := 0; i < deleteDiff; i++ {
        go func(ix int) {
            defer deleteWait.Done()
            if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
                dsc.expectations.DeletionObserved(dsKey)
                if !apierrors.IsNotFound(err) {
                    klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)
                    errCh <- err
                    utilruntime.HandleError(err)
                }
            }
        }(i)
    }
    deleteWait.Wait()

    // collect errors if any for proper reporting/retry logic in the controller
    errors := []error{}
    close(errCh)
    for err := range errCh {
        errors = append(errors, err)
    }
    return utilerrors.NewAggregate(errors)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352