DaemonSet Controller

DaemonSet确保集群中每个(部分)node运行一份pod副本,当node加入集群时创建pod,当node离开集群时回收pod。如果删除DaemonSet,其创建的所有pod也被删除,DaemonSet中的pod覆盖整个集群。

当需要在集群内每个node运行同一个pod,使用DaemonSet是有价值的,以下是典型使用场景:

运行集群存储守护进程,如glusterd、ceph。
运行集群日志收集守护进程,如fluentd、logstash。
运行节点监控守护进程,如Prometheus Node Exporter, collectd, Datadog agent, New Relic agent, or Ganglia gmond。

DAEMONSET 与 DEPLOYMENT区别
Deployment 部署的副本 Pod 会分布在各个 Node 上,每个 Node 都可能运行好几个副本。DaemonSet 的不同之处在于:每个 Node 上最多只能运行一个副本。DAEMONSET 特别适合做系统维护 ,提供系统服务的那些进程。所以DAEMONSET 和 DEPLOYMENT controller 也有相似之处,就是他们都负责提供部署,不同之处是DEPLOYMENT controller 是负责application 的service部署到, 而DAEMONSET
是负责系统级别服务部署的。

daemonset controller是kube-controller-manager组件中众多控制器中的一个,是 daemonset 资源对象的控制器,其通过对daemonset、pod、node、ControllerRevision四种资源的监听,当这四种资源发生变化时会触发 daemonset controller 对相应的daemonset资源进行调谐操作,从而完成daemonset在合适node上pod的创建、在不合适node上pod的删除、daemonset的滚动更新、daemonset状态status更新、旧版本daemonset清理等操作。

创建DaemonSet

以下是DaemonSet的示例spec文件,运行fluentd-elasticsearch image:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-elasticsearch
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
spec:
  selector:
    matchLabels:
      name: fluentd-elasticsearch
  template:
    metadata:
      labels:
        name: fluentd-elasticsearch
    spec:
      tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule
      containers:
      - name: fluentd-elasticsearch
        image: k8s.gcr.io/fluentd-elasticsearch:1.20
        resources:
          limits:
            memory: 200Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers

以上DaemonSet中没有restart policy字段,默认为Always。如果有的话,必需将值设置成Always,否则在创建时出出现不可用错误。

DaemonSet同样会受到Taint的抵制,如果不在配置中加入匹配的Toleration,那么DaemonSet不会在拥有Taint属性的node上部署pod。上例中有如下内容:

tolerations:
      - key: node-role.kubernetes.io/master
        effect: NoSchedule

原因就是系统默认为master节点增加了 “node-role.kubernetes.io/master”的Taint,以抵制普通pod部署,使master成为专用节点。因为我们预期上例DaemonSet在集群内全局部署,因此需要加入相匹配的Toleration。

如果预期DaemonSet只在特定节点上运行,可以在上述配置文件中加入.spec.template.spec.nodeSelector字段。.
spec.template.spec.nodeSelector字段内加入节点选择器(node selector)或者亲和选择器(node affinity),则DaemonSet只会在满足条件的node上部署pod。总之,可以通过Taint、Toleration、Affinity、node label控制DaemonSet部署pod的节点范围。

将以上内容保存在daemonset.yaml文件中,执行如下命令创建DaemonSet:

kubectl create -f https://k8s.io/examples/controllers/daemonset.yaml

系统如何调度DaemonSet pod?

默认情况下DaemonSet在创建pod时,为其增加spec.nodeName字段,也就是说所创建的pod运行在那个节上在创建阶段就已经确定,所以DaemonSet中的pod实际上没有接受kubernetes scheduler的调度,它不需要调度,因此产生以下两个特性:

DaemonSet中的pod不遵从节点的unreachable条件,也就是即使节点被系统判定为不可达,DaemonSet仍然试图在其上部署pod。
在集群引导阶段,即使kubernetes scheduler还没有部署生效,DaemonSet仍然可以将pod部署到集群中的任何节点,此特性主要是在集群引导阶段使用。
因为DaemonSet不同于常规pod的调度特性,它带来两个问题:

pod行为不一致。普通pod被创建以后等待调度的阶段称为pending,因为DaemonSet中的pod无需调度,因而无此状态,用户会因此产生迷惑。
pod优先级特性由kubernetes scheduler实现,DaemonSet无此特性。当系统打开pod优先级功能时,pod优先级特性会被DaemonSet中的pod忽略。
为了解决以上两个问题,kubernetes增加了通过设置允许DaemonSet使用kurbernetes scheduler的功能,并在1.11的 alpha版本中成为稳定特性。其实现机制是DaemonSet在创建pod时,不再自动添加.spec.nodeName,而是以nodeAffinity取而代之,示例如下:

nodeAffinity:
  requiredDuringSchedulingIgnoredDuringExecution:
    nodeSelectorTerms:
    - matchFields:
      - key: metadata.name
        operator: In
        values:
        - target-host-name

其中"target-host-name"就是原来.spec.nodeName的值,这样pod就会被kubernetes scheduler调度。通过以上操作解决了上述两个问题。但DaemonSet的调度有自己因有的特性,在上文中提到的“不受节点unreachable条件限制”,为了使DaemonSet在使用kubernetes scheduler时仍然保持此特性需要打开集群的"TaintNodesByCondition"特性,如果DaemonSet使用主机网络那么必需在DaemonSet中添加如下的Toleration:

node.kubernetes.io/network-unavailable:NoSchedule

DaemonSet自动添加的Toleration
系统在某此条件下会自动为节点添加Taint,比如硬盘不足、网络不可达等,以阻止新pod往不满足条件的节点上调度。但DaemonSet的目的是在全部有资格的node上部署,不希望被这种Taint打断,因经系统也默认为DaemonSet上的pod添加Toleration。如下表:

Toleration Key Effect Alpha Features Version Description
node.kubernetes.io/not-ready NoExecute TaintBasedEvictions 1.8+ when TaintBasedEvictions is enabled,they will not be evicted when there are node problems such as a network partition.
node.kubernetes.io/unreachable NoExecute TaintBasedEvictions 1.8+ when TaintBasedEvictions is enabled,they will not be evicted when there are node problems such as a network partition.
node.kubernetes.io/disk-pressure NoSchedule TaintNodesByCondition 1.8+
node.kubernetes.io/memory-pressure NoSchedule TaintNodesByCondition 1.8+
node.kubernetes.io/unschedulable NoSchedule ScheduleDaemonSetPods, TaintNodesByCondition 1.11+ When ScheduleDaemonSetPodsis enabled, TaintNodesByConditionis necessary to make sure DaemonSet pods tolerate unschedulable attributes by default scheduler.
node.kubernetes.io/network-unavailable NoSchedule ScheduleDaemonSetPods, TaintNodesByCondition, hostnework 1.11+ When ScheduleDaemonSetPodsis enabled, TaintNodesByConditionis necessary to make sure DaemonSet pods, who uses host network, tolerate network-unavailable attributes by default scheduler.
node.kubernetes.io/out-of-disk NoSchedule ExperimentalCriticalPodAnnotation(critical pod only), TaintNodesByCondition 1.8+

与DaemonSet中pod通信的几种模式

  • Push:收集数据并向其它服务发送,如将收集到的统计信息发送给统计类型数据库。
  • NodeIP and Known Port:DaemonSet中的pod可以被设置使用主机网络的一个port,而客户端可以很方便的知道节点IP列表,因此可以通过节点IP地址与port访问DaemonSet pod。
  • DNS:创建无头服务并且让它的选择器匹配所有DaemonSet的pod,这样DaemonSet中的pod就会成为无头服务的endpoints。类似于StatefulSet。
  • Service:让Service选中DaemonSet,为访问DaemonSet中的pod提供统一入口与负载均衡。

daemonset controller初始化与启动分析

startDaemonSetController

startDaemonSetController主要逻辑:
(1)调用daemon.NewDaemonSetsController新建并初始化DaemonSetsController;
(2)拉起一个goroutine,跑DaemonSetsController的Run方法。

// cmd/kube-controller-manager/app/apps.go
func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
        return nil, false, nil
    }
    dsc, err := daemon.NewDaemonSetsController(
        ctx.InformerFactory.Apps().V1().DaemonSets(),
        ctx.InformerFactory.Apps().V1().ControllerRevisions(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Core().V1().Nodes(),
        ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
        flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
    }
    go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
    return nil, true, nil
}

daemon.NewDaemonSetsController

从daemon.NewDaemonSetsController函数代码中可以看到,daemonset controller注册了daemonset、node、pod与ControllerRevisions对象的EventHandler,也即对这几个对象的event进行监听,把event放入事件队列并做处理。并且将dsc.syncDaemonSet方法赋值给dsc.syncHandler,也即注册为核心处理方法,在dsc.Run方法中会调用该核心处理方法来调谐daemonset对象(核心处理方法后面会进行详细分析)。

// pkg/controller/daemon/daemon_controller.go
func NewDaemonSetsController(
    daemonSetInformer appsinformers.DaemonSetInformer,
    historyInformer appsinformers.ControllerRevisionInformer,
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    kubeClient clientset.Interface,
    failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dsc := &DaemonSetsController{
        kubeClient:    kubeClient,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        },
        crControl: controller.RealControllerRevisionControl{
            KubeClient: kubeClient,
        },
        burstReplicas: BurstReplicas,
        expectations:  controller.NewControllerExpectations(),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
    }

    daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ds := obj.(*apps.DaemonSet)
            klog.V(4).Infof("Adding daemon set %s", ds.Name)
            dsc.enqueueDaemonSet(ds)
        },
        UpdateFunc: func(old, cur interface{}) {
            oldDS := old.(*apps.DaemonSet)
            curDS := cur.(*apps.DaemonSet)
            klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
            dsc.enqueueDaemonSet(curDS)
        },
        DeleteFunc: dsc.deleteDaemonset,
    })
    dsc.dsLister = daemonSetInformer.Lister()
    dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced

    historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addHistory,
        UpdateFunc: dsc.updateHistory,
        DeleteFunc: dsc.deleteHistory,
    })
    dsc.historyLister = historyInformer.Lister()
    dsc.historyStoreSynced = historyInformer.Informer().HasSynced

    // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
    // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addPod,
        UpdateFunc: dsc.updatePod,
        DeleteFunc: dsc.deletePod,
    })
    dsc.podLister = podInformer.Lister()

    // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
    podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
        "nodeName": indexByPodNodeName,
    })
    dsc.podNodeIndex = podInformer.Informer().GetIndexer()
    dsc.podStoreSynced = podInformer.Informer().HasSynced

    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dsc.addNode,
        UpdateFunc: dsc.updateNode,
    },
    )
    dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
    dsc.nodeLister = nodeInformer.Lister()

    dsc.syncHandler = dsc.syncDaemonSet
    dsc.enqueueDaemonSet = dsc.enqueue

    dsc.failedPodsBackoff = failedPodsBackoff

    return dsc, nil
}

dsc.Run

主要看到for循环处,根据workers的值(默认值为2),启动相应数量的goroutine,跑dsc.runWorker方法,主要是调用前面讲到的daemonset controller核心处理方法dsc.syncDaemonSet。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dsc.queue.ShutDown()

    klog.Infof("Starting daemon sets controller")
    defer klog.Infof("Shutting down daemon sets controller")

    if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(dsc.runWorker, time.Second, stopCh)
    }

    go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)

    <-stopCh
}

dsc.runWorker

从queue队列中取出事件key,并调用dsc.syncHandle即dsc.syncDaemonSet做调谐处理。queue队列里的事件来源前面讲过,是daemonset controller注册的daemonset、node、pod与ControllerRevisions对象的EventHandler,它们的变化event会被监听到然后放入queue中。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) runWorker() {
    for dsc.processNextWorkItem() {
    }
}

func (dsc *DaemonSetsController) processNextWorkItem() bool {
    dsKey, quit := dsc.queue.Get()
    if quit {
        return false
    }
    defer dsc.queue.Done(dsKey)

    err := dsc.syncHandler(dsKey.(string))
    if err == nil {
        dsc.queue.Forget(dsKey)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
    dsc.queue.AddRateLimited(dsKey)

    return true
}

以上都是常规定义,主要是看下面的运行逻辑。

daemonset controller核心处理逻辑分析

直接看到daemonset controller核心处理方法syncDaemonSet。

主要逻辑:
(1)获取执行方法时的当前时间,并定义defer函数,用于计算该方法总执行时间,也即统计对一个 daemonset 进行同步调谐操作的耗时;
(2)根据 daemonset 对象的命名空间与名称,获取 daemonset 对象;
(3)获取所有node的对象列表;
(4)判断daemonset对象的DeletionTimestamp是否为空,不为空则直接return,代表该daemonset对象正在被删除,无需再调谐;
(5)调用dsc.constructHistory获取daemonset的历史版本;
(6)调用dsc.expectations.SatisfiedExpectations,判断该daemonset对象是否满足expectations机制(expectations机制与replicaset controller分析中的用途一致,这里不再展开分析),不满足则调用dsc.updateDaemonSetStatus更新daemonset状态后直接return;
(7)调用dsc.manage,dsc.manage方法中不区分新旧daemonset版本的pod,只保证daemonset的pod运行在每一个合适条件的node上,在合适的node上没有daemonset的pod时创建pod,且把不符合条件的node上的daemonset pod删除掉;
(8)再次调用dsc.expectations.SatisfiedExpectations判断是否满足expectations机制,满足则判断daemonset配置的更新策略,如果是滚动更新则调用dsc.rollingUpdate,主要用于处理daemonset对象的滚动更新处理,根据配置的滚动更新配置,删除旧的pod(pod的创建操作在dsc.manage方法中进行);
当daemonset更新策略配置为OnDelete时,这里不做额外处理,因为只有当手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建,手动删除老的pod后,将在dsc.manage方法中创建新版本的pod;
(9)调用dsc.cleanupHistory,根据daemonset的spec.revisionHistoryLimit配置以及版本新旧顺序(优先清理最老旧版本)来清理daemonset的已经不存在pod的历史版本;
(10)最后调用dsc.updateDaemonSetStatus,根据现存daemonset pod的部署情况以及pod的状态、node是否满足pod运行条件等信息,更新daemonset的status。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
    return err
}
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if errors.IsNotFound(err) {
    klog.V(3).Infof("daemon set has been deleted %v", key)
    dsc.expectations.DeleteExpectations(key)
    return nil
}
if err != nil {
    return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
}

nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
    return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
}

everything := metav1.LabelSelector{}
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
    dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
    return nil
}

// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
    return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}

// If the DaemonSet is being deleted (either by foreground deletion or
// orphan deletion), we cannot be sure if the DaemonSet history objects
// it owned still exist -- those history objects can either be deleted
// or orphaned. Garbage collector doesn't guarantee that it will delete
// DaemonSet pods before deleting DaemonSet history objects, because
// DaemonSet history doesn't own DaemonSet pods. We cannot reliably
// calculate the status of a DaemonSet being deleted. Therefore, return
// here without updating status for the DaemonSet being deleted.
if ds.DeletionTimestamp != nil {
    return nil
}

// Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ds)
if err != nil {
    return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

if !dsc.expectations.SatisfiedExpectations(dsKey) {
    // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
    return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}

err = dsc.manage(ds, nodeList, hash)
if err != nil {
    return err
}

// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
    switch ds.Spec.UpdateStrategy.Type {
    case apps.OnDeleteDaemonSetStrategyType:
    case apps.RollingUpdateDaemonSetStrategyType:
        err = dsc.rollingUpdate(ds, nodeList, hash)
    }
    if err != nil {
        return err
    }
}

err = dsc.cleanupHistory(ds, old)
if err != nil {
    return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}

return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)

}

dsc.manage

dsc.manage方法中不区分新旧daemonset版本的pod,主要是用于保证daemonset的pod运行在每一个合适条件的node上,在合适的node上没有daemonset的pod时创建pod,且把不符合条件的node上的daemonset pod删除掉。

主要逻辑:
(1)调用dsc.getNodesToDaemonPods,根据daemonset的Selector获取daemonset的所有pod,然后返回pod与node的对应关联关系map;
(2)遍历前面获取到的node列表,执行dsc.podsShouldBeOnNode,根据pod是否指定了nodeName、nodeSelector、ToleratesNodeTaints等,以及node对象的相关信息来做比对,来确定在某个node上是否已经存在daemonset对应的pod,以及是要为该daemonset创建pod还是删除pod;
(3)调用getUnscheduledPodsWithoutNode,将pod的nodeName与前面获取到的node列表比对,将nodeName不存在的pod加入到要被删除的pod列表中;
(4)调用dsc.syncNodes,根据前面获取到的要创建的pod的node列表以及要删除的pod列表,做相应的创建、删除pod的操作。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // Find out the pods which are created for the nodes by DaemonSet.
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
    // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
    var nodesNeedingDaemonPods, podsToDelete []string
    for _, node := range nodeList {
        nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
            node, nodeToDaemonPods, ds)

        if err != nil {
            continue
        }

        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
        podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
    // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
    podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

    // Label new pods using the hash label value of the current history when creating them
    if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
        return err
    }

    return nil
}

dsc.podsShouldBeOnNode

dsc.podsShouldBeOnNode方法用于判断一个node上是否需要运行daemonset pod,方法返回nodesNeedingDaemonPods与podsToDelete,分别代表需要运行daemonset pod的node、需要被删除的pod列表。

主要逻辑:
(1)调用dsc.nodeShouldRunDaemonPod,返回shouldSchedule与shouldContinueRunning,分别代表daemonset pod是否应该调度到某node、某node上的daemonset pod是否可以继续运行;
(2)当shouldSchedule为true,即pod应该调度到某node,但现在不存在时,将该node添加到nodesNeedingDaemonPods;
(3)当shouldContinueRunning为true,找出在该node上还在运行没有退出的daemonset pod列表,然后按照pod创建时间排序,只保留最新创建的pod,其余的加入到podsToDelete;
(4)当shouldContinueRunning为false,即daemonset pod不应继续在某node上运行,且现在该node已经存在该daemonset pod时,将node上该daemonset的所有pod都加入到podsToDelete;
(5)返回nodesNeedingDaemonPods与podsToDelete,分别代表需要运行daemonset pod的node、需要被删除的pod列表。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) podsShouldBeOnNode(
    node *v1.Node,
    nodeToDaemonPods map[string][]*v1.Pod,
    ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) {

    _, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
    if err != nil {
        return
    }

    daemonPods, exists := nodeToDaemonPods[node.Name]

    switch {
    case shouldSchedule && !exists:
        // If daemon pod is supposed to be running on node, but isn't, create daemon pod.
        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
    case shouldContinueRunning:
        // If a daemon pod failed, delete it
        // If there's non-daemon pods left on this node, we will create it in the next sync loop
        var daemonPodsRunning []*v1.Pod
        for _, pod := range daemonPods {
            if pod.DeletionTimestamp != nil {
                continue
            }
            if pod.Status.Phase == v1.PodFailed {
                // This is a critical place where DS is often fighting with kubelet that rejects pods.
                // We need to avoid hot looping and backoff.
                backoffKey := failedPodsBackoffKey(ds, node.Name)

                now := dsc.failedPodsBackoff.Clock.Now()
                inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
                if inBackoff {
                    delay := dsc.failedPodsBackoff.Get(backoffKey)
                    klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
                        pod.Namespace, pod.Name, node.Name, delay)
                    dsc.enqueueDaemonSetAfter(ds, delay)
                    continue
                }

                dsc.failedPodsBackoff.Next(backoffKey, now)

                msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
                klog.V(2).Infof(msg)
                // Emit an event so that it's discoverable to users.
                dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
                podsToDelete = append(podsToDelete, pod.Name)
            } else {
                daemonPodsRunning = append(daemonPodsRunning, pod)
            }
        }
        // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
        // Sort the daemon pods by creation time, so the oldest is preserved.
        if len(daemonPodsRunning) > 1 {
            sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
            for i := 1; i < len(daemonPodsRunning); i++ {
                podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
            }
        }
    case !shouldContinueRunning && exists:
        // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
        for _, pod := range daemonPods {
            if pod.DeletionTimestamp != nil {
                continue
            }
            podsToDelete = append(podsToDelete, pod.Name)
        }
    }

    return nodesNeedingDaemonPods, podsToDelete, nil
}

dsc.nodeShouldRunDaemonPod
关于dsc.nodeShouldRunDaemonPod方法,不做展开分析,它主要是调用dsc.simulate执行Predicates预选算法来检查某个node 是否满足pod的运行条件,如果预选失败,则根据失败信息,返回wantToRun、shouldSchedule、shouldContinueRunning,分别代表node与pod的selector、taints 等是否匹配(不考虑node资源是否充足)、daemonset pod是否应该调度到某node、某node上的daemonset pod是否可以继续运行,预选成功则全都返回true。

dsc.syncNodes

dsc.syncNodes是daemonset controller对pod进行创建和删除操作的方法。

该方法也涉及到expectations机制,与replicaset controller中的expectations机制作用一致,使用上也基本一致,忘记的可以回头看下replicaset controller分析中对expectations机制的分析,这里不再对expectations机制展开分析。

主要逻辑:
(1)计算要创建、删除pod的数量,上限为dsc.burstReplicas(250),即每一次对daemonset对象的同步操作,能创建/删除的pod数量上限为250,超出的部分需要在下一次同步操作才能进行;
(2)调用dsc.expectations.SetExpectations,设置expectations;
(3)调用util.CreatePodTemplate,计算并获取要创建的podTemplate;
(4)先进行pod的创建操作:pod的创建与replicaset controller创建pod类似,使用了慢开始算法,分多批次进行创建,第一批创建1个pod,第二批创建2个pod,第三批创建4个pod,以2倍往下依次执行,直到达到期望为止;而每一批次的创建,会拉起与要创建pod数量相等的goroutine,每个goroutine负责创建一个pod,并使用WaitGroup等待该批次的所有创建任务完成,再进行下一批次的创建;
(4)再进行pod的删除操作:对于每个要删除的pod,都拉起一个goroutine来做删除操作,并使用WaitGroup等待所有goroutine完成。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
    // We need to set expectations before creating/deleting pods to avoid race conditions.
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    createDiff := len(nodesNeedingDaemonPods)
    deleteDiff := len(podsToDelete)

    if createDiff > dsc.burstReplicas {
        createDiff = dsc.burstReplicas
    }
    if deleteDiff > dsc.burstReplicas {
        deleteDiff = dsc.burstReplicas
    }

    dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

    // error channel to communicate back failures.  make the buffer big enough to avoid any blocking
    errCh := make(chan error, createDiff+deleteDiff)

    klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
    createWait := sync.WaitGroup{}
    // If the returned error is not nil we have a parse error.
    // The controller handles this via the hash.
    generation, err := util.GetTemplateGeneration(ds)
    if err != nil {
        generation = nil
    }
    template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
    // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
    // and double with each successful iteration in a kind of "slow start".
    // This handles attempts to start large numbers of pods that would
    // likely all fail with the same error. For example a project with a
    // low quota that attempts to create a large number of pods will be
    // prevented from spamming the API service with the pod create requests
    // after one of its pods fails.  Conveniently, this also prevents the
    // event spam that those failures would generate.
    batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
    for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
        errorCount := len(errCh)
        createWait.Add(batchSize)
        for i := pos; i < pos+batchSize; i++ {
            go func(ix int) {
                defer createWait.Done()

                podTemplate := template.DeepCopy()
                // The pod's NodeAffinity will be updated to make sure the Pod is bound
                // to the target node by default scheduler. It is safe to do so because there
                // should be no conflicting node affinity with the target node.
                podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
                    podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

                err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
                    ds, metav1.NewControllerRef(ds, controllerKind))

                if err != nil {
                    if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                        // If the namespace is being torn down, we can safely ignore
                        // this error since all subsequent creations will fail.
                        return
                    }
                }
                if err != nil {
                    klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
                    dsc.expectations.CreationObserved(dsKey)
                    errCh <- err
                    utilruntime.HandleError(err)
                }
            }(i)
        }
        createWait.Wait()
        // any skipped pods that we never attempted to start shouldn't be expected.
        skippedPods := createDiff - (batchSize + pos)
        if errorCount < len(errCh) && skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
            dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
            // The skipped pods will be retried later. The next controller resync will
            // retry the slow start process.
            break
        }
    }

    klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
    deleteWait := sync.WaitGroup{}
    deleteWait.Add(deleteDiff)
    for i := 0; i < deleteDiff; i++ {
        go func(ix int) {
            defer deleteWait.Done()
            if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
                klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
                dsc.expectations.DeletionObserved(dsKey)
                errCh <- err
                utilruntime.HandleError(err)
            }
        }(i)
    }
    deleteWait.Wait()

    // collect errors if any for proper reporting/retry logic in the controller
    errors := []error{}
    close(errCh)
    for err := range errCh {
        errors = append(errors, err)
    }
    return utilerrors.NewAggregate(errors)
}

dsc.rollingUpdate

dsc.rollingUpdate方法主要用于处理daemonset对象的滚动更新处理,根据配置的滚动更新配置,删除旧的pod(pod的创建操作在dsc.manage方法中进行)。

主要逻辑:
(1)调用dsc.getNodesToDaemonPods,获取daemonset所属pod与node的对应关联关系map;
(2)调用dsc.getAllDaemonSetPods,获取所有的旧版本daemonset的pod;
(3)调用dsc.getUnavailableNumbers,根据daemonset的滚动更新策略配置获取maxUnavailable值,再获取numUnavailable值,numUnavailable代表在符合条件的node节点中,没有daemonset对应的pod或者pod处于Unavailable状态的node数量;
(4)调用util.SplitByAvailablePods,将旧版本daemonset的所有pod分成oldAvailablePods列表,以及oldUnavailablePods列表;
(5)定义一个字符串数组oldPodsToDelete,用于储存准备要删除的pod;
(6)将全部oldUnavailablePods加入到oldPodsToDelete数组中;
(7)遍历oldAvailablePods列表,当numUnavailable小于maxUnavailable值时,将pod加入到oldPodsToDelete数组中,且numUnavailable值加一;
(8)调用dsc.syncNodes,将oldPodsToDelete数组中的pod删除。

// pkg/controller/daemon/update.go
func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    if err != nil {
        return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
    }

    _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
    maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
    if err != nil {
        return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    }
    oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)

    // for oldPods delete all not running pods
    var oldPodsToDelete []string
    klog.V(4).Infof("Marking all unavailable old pods for deletion")
    for _, pod := range oldUnavailablePods {
        // Skip terminating pods. We won't delete them again
        if pod.DeletionTimestamp != nil {
            continue
        }
        klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
        oldPodsToDelete = append(oldPodsToDelete, pod.Name)
    }

    klog.V(4).Infof("Marking old pods for deletion")
    for _, pod := range oldAvailablePods {
        if numUnavailable >= maxUnavailable {
            klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
            break
        }
        klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
        oldPodsToDelete = append(oldPodsToDelete, pod.Name)
        numUnavailable++
    }
    return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}

dsc.updateDaemonSetStatus

dsc.updateDaemonSetStatus方法负责根据现存daemonset pod的部署情况以及pod的状态、node是否满足pod运行条件等信息,来更新daemonset的status状态值,这里不对代码展开分析,只分析一下daemonset的status中各个字段的意思。

(1)currentNumberScheduled: 已经调度了daemonset pod的节点数量;
(2)desiredNumberScheduled: 期望调度daemonset pod的节点数量;
(3)numberMisscheduled:不需要调度daemonset pod但已经调度完成了的节点数量;
(4)numberAvailable: pod状态达到Available的数量(pod达到Ready状态MinReadySeconds时间后,就认为达到了Available状态);
(5)numberReady: pod状态达到Ready的数量;
(6)numberUnavailable: desiredNumberScheduled - numberAvailable;
(7)updatedNumberScheduled: 已经调度了最新版本daemonset pod的节点数量。

总结

daemonset controller创建 pod 的流程与 replicaset controller 创建 pod 的流程是相似的,都使用了 expectations 机制并且限制了在一次调谐过程中最多创建或删除的 pod 数量。daemonset的更新方式与 statefulset 一样包含 OnDelete 和 RollingUpdate(滚动更新) 两种,OnDelete 方式需要手动删除对应的 pod,然后daemonset controller才会创建出新的pod,而 RollingUpdate 方式与 statefulset 和 deployment 有所区别, RollingUpdate方式更新时是按照先删除pod再创建pod的顺序进行,不像deployment那样可以先创建出新的pod再删除旧的pod。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容