简介
daemonset controller 监控 damonset,pod 以及 node 的变化,然后进行相应的 pod 创建或者删除
然后 kubelet 监控到pod改动,进行实际的创建或者删除
源码
cmd/kube-controller-manager/app/apps.go 中
func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
return nil, false, nil
}
dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
return nil, true, nil
}
pkg/controller/daemon/daemon_controller.go 中
func NewDaemonSetsController(
daemonSetInformer appsinformers.DaemonSetInformer,
historyInformer appsinformers.ControllerRevisionInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
},
crControl: controller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
}
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addDaemonset,
UpdateFunc: dsc.updateDaemonset,
DeleteFunc: dsc.deleteDaemonset,
})
dsc.dsLister = daemonSetInformer.Lister()
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addHistory,
UpdateFunc: dsc.updateHistory,
DeleteFunc: dsc.deleteHistory,
})
dsc.historyLister = historyInformer.Lister()
dsc.historyStoreSynced = historyInformer.Informer().HasSynced
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podLister = podInformer.Lister()
// This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName": indexByPodNodeName,
})
dsc.podNodeIndex = podInformer.Informer().GetIndexer()
dsc.podStoreSynced = podInformer.Informer().HasSynced
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
},
)
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeLister = nodeInformer.Lister()
dsc.syncHandler = dsc.syncDaemonSet
dsc.enqueueDaemonSet = dsc.enqueue
dsc.failedPodsBackoff = failedPodsBackoff
return dsc, nil
}
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dsc.queue.ShutDown()
klog.Infof("Starting daemon sets controller")
defer klog.Infof("Shutting down daemon sets controller")
if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(dsc.runWorker, time.Second, stopCh)
}
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
<-stopCh
}
func (dsc *DaemonSetsController) runWorker() {
for dsc.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (dsc *DaemonSetsController) processNextWorkItem() bool {
dsKey, quit := dsc.queue.Get()
if quit {
return false
}
defer dsc.queue.Done(dsKey)
err := dsc.syncHandler(dsKey.(string))
if err == nil {
dsc.queue.Forget(dsKey)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
dsc.queue.AddRateLimited(dsKey)
return true
}
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(3).Infof("daemon set has been deleted %v", key)
dsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
}
nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
}
everything := metav1.LabelSelector{}
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
return nil
}
// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}
// If the DaemonSet is being deleted (either by foreground deletion or
// orphan deletion), we cannot be sure if the DaemonSet history objects
// it owned still exist -- those history objects can either be deleted
// or orphaned. Garbage collector doesn't guarantee that it will delete
// DaemonSet pods before deleting DaemonSet history objects, because
// DaemonSet history doesn't own DaemonSet pods. We cannot reliably
// calculate the status of a DaemonSet being deleted. Therefore, return
// here without updating status for the DaemonSet being deleted.
if ds.DeletionTimestamp != nil {
return nil
}
// Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
if !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}
err = dsc.manage(ds, nodeList, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds, nodeList, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds)
if err != nil {
continue
}
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
}
// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// Label new pods using the hash label value of the current history when creating them
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
return nil
}
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
}
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff)
klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()
podTemplate := template.DeepCopy()
// The pod's NodeAffinity will be updated to make sure the Pod is bound
// to the target node by default scheduler. It is safe to do so because there
// should be no conflicting node affinity with the target node.
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
}
if err != nil {
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
}
klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)
errCh <- err
utilruntime.HandleError(err)
}
}
}(i)
}
deleteWait.Wait()
// collect errors if any for proper reporting/retry logic in the controller
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}