client-go源码分析

一、client-go informer简介

informer.png

从图中可以看出,k8s informer主要包括以下几个部分:

1.Reflector

(1)Reflector调用kube-apiserver的list接口获取资源对象列表,然后调用DeltaFIFO的Replace方法将object包装成Sync/Deleted类型的Delta丢进DeltaFIFO中;

(2)Reflector调用kube-apiserver的watch接口获取资源对象的变化,然后调用DeltaFIFO的Add/Update/Delete方法将object包装成Added/Updated/Deleted类型的Delta丢到DeltaFIFO中;

2.DeltaFIFO

DeltaFIFO中存储着一个map和一个queue;

(1)其中queue可以看成是一个先进先出队列,一个object进入DeltaFIFO中,会判断queue中是否已经存在该object key,不存在则添加到队尾;

(2)map即map[object key]Deltas,是object key和Deltas的映射,Deltas是Delta的切片类型,Delta中存储着DeltaType和object;

DeltaType有4种,分别是Added、Updated、Deleted、Sync

3.Controller

Controller从DeltaFIFO的queue中pop一个object key出来,并从DeltaFIFO的map中获取其对应的 Deltas出来进行处理,遍历Deltas,根据object的变化类型更新Indexer本地缓存,并通知Processor相关对象有变化事件发生:

(1)如果DeltaType是Deleted,则调用Indexer的Delete方法,将Indexer本地缓存中的object删除,并构造deleteNotification,通知Processor做处理;

(2)如果DeltaType是Added/Updated/Sync,调用Indexer的Get方法从Indexer本地缓存中获取该对象,存在则调用Indexer的Update方法来更新Indexer缓存中的该对象,随后构造updateNotification,通知Processor做处理;如果Indexer中不存在该对象,则调用Indexer的Add方法将该对象存入本地缓存中,并构造addNotification,通知Processor做处理;

4.Processor

Processor根据Controller的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)将object key加入工作队列workqueue给custom controller处理。

5.Indexer

Indexer是资源对象的本地缓存,依赖于threadSafeMap的items,indexers,indices
items:items是一个map,本地存储所有资源对象,key为资源对象的namespace/name组成,value为资源对象本身
indexers,indices:索引

6.ResourceEventHandler

调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)将object key加入工作队列workqueue给custom controller处理

7.informer使用示例代码

func main() {
    // 自定义与kube-apiserver通信的config配置
    kubeconfig := "~/.kube/config"

    //①构建与kube-apiserver通信的config配置;
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        klog.Fatalf("Failed to create config: %v", err)
    }

    // ②初始化与apiserver通信的clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        klog.Fatalf("Failed to create client: %v", err)
    }

    // ③初始化shared informer factory
    factory := informers.NewSharedInformerFactory(clientset, 30 * time.Second)
    // 每个资源对象都有一个informer,例如PodInformer,并有2个接口方法Informer(初始化informer), Lister(创建podLister)
    podInformer := factory.Core().V1().Pods()
    // ④初始化podInformer并保存在factory
    informer := podInformer.Informer()

    // ⑤注册informer的自定义ResourceEventHandler
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: onUpdate,
        DeleteFunc: onDelete,
    })

    stopCh := make(chan struct{})
    defer close(stopCh)

    // ⑥启动shared informer factory,从而启动factory里面每个informer从api-server去list和watch,
    factory.Start(stopCh)

    // ⑦等待所有启动的 Informer 的缓存indexer被同步,因为每个k8s资源对象都有一个对应的Informer,再调podLister从缓存indexer中获取k8s资源对象
    factory.WaitForCacheSync(stopCh)
       // ⑧调用Lister接口方法创建lister,从informer中的indexer本地缓存中获取资源对象;
    podLister := podInformer.Lister()
    podList, err := podLister.List(labels.Everything())
    for _, pod := range podList {
        klog.Infof("podName: %s", pod.Name)
    }

    <-stopCh
}

二、informer初始化与启动

1.SharedInformerFactory的初始化

// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
    // 连接k8s的client: clientset
    client           kubernetes.Interface
    namespace        string
    defaultResync    time.Duration
    customResync     map[reflect.Type]time.Duration
    // key为obj资源类型,value为对应资源的sharedIndexInformer
    informers map[reflect.Type]cache.SharedIndexInformer
    // 记录已经启动的informer
    startedInformers map[reflect.Type]bool
}

2.informer初始化

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    // Indexer中资源对象的本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver的请求压力
    indexer    Indexer
    // Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor
    controller Controller
    // Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;
    processor  *sharedProcessor
    // 从api-sever获取资源对象,保存到DeltaFIFO中
    listerWatcher ListerWatcher

    objectType    runtime.Object

    resyncCheckPeriod time.Duration

    defaultEventHandlerResyncPeriod time.Duration
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      objType,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

3.启动sharedInformerFactory

sharedInformerFactory.Start方法启动里面所有informer。

// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

4.sharedIndexInformer.Run(核心)

sharedIndexInformer.Run用于启动informer,主要逻辑为:
(1)调用NewDeltaFIFO,初始化DeltaFIFO;
(2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法;
(3)调用New,利用Config结构体来初始化controller;
(4)调用s.processor.run,启动processor;
(5)调用s.controller.Run,启动controller;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()

    // 1.初始化DeltaFIFO
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    // 2.构建Config结构体
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
      // pop回调函数
        Process: s.HandleDeltas,
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        // 3.初始化controller
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // Separate stop channel because Processor should be stopped strictly after controller
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    // 4.启动processor,启动listener.run与调用listener.pop
    wg.StartWithChannel(processorStopCh, s.processor.run)

    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    // 5.启动controller,初始化并启动Reflector,调用c.processLoop,开始controller的核心处理
    s.controller.Run(stopCh)
}

4.1 controller启动

4.1.1 controller.Run

①初始化并启动Reflector
②c.processLoop是controller的核心处理,从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生;

// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    // 1.初始化Reflector
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock

    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()

    var wg wait.Group
    defer wg.Wait()
    // 1.启动Reflector
    wg.StartWithChannel(stopCh, r.Run)
    // 2.c.processLoop是controller的核心处理,从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生
    wait.Until(c.processLoop, time.Second, stopCh)
}

4.1.2controller.processLoop

processLoop循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),然后用c.config.Process方法来做处理Deltas,c.config.Process就是s.HandleDeltas

func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))    // PopProcessFunc(c.config.Process)表示将c.config.Process转换为PopProcessFunc类型
        if err != nil {
            if err == ErrFIFOClosed {
                return
            }
            if c.config.RetryOnError {
                // This is the safe way to re-enqueue.
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

4.1.3 sharedIndexInformer.HandleDeltas

c.config.Process即为s.HandleDeltas方法

HandleDeltas方法的主要逻辑:
(1)循环遍历Deltas,拿到单个Delta;
(2)判断Delta的类型;
(3)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并调用s.processor.distribute方法;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并调用s.processor.distribute方法;
(4)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并调用s.processor.distribute方法;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

三、Reflector源码分析

1.Reflector概述

Reflector的两个核心操作:
1 List&Watch
Reflector从kube-apiserver中list&watch资源对象,然后将对象的变化包装成Delta并将其存到DeltaFIFO中

2.将对象的变化包装成Delta然后扔进DeltaFIFO
Reflector首先通过List操作获取全量的资源对象数据,调用DeltaFIFO的Replace方法全量插入DeltaFIFO,然后后续通过Watch操作根据资源对象的变化类型相应的调用DeltaFIFO的Add、Update、Delete方法,将对象及其变化插入到DeltaFIFO中

2.Reflector初始化与启动分析

2.1 Reflector结构体

先来看到Reflector结构体,这里重点看到以下属性:
①expectedType:放到Store中(即DeltaFIFO中)的对象类型;
②store:store会赋值为DeltaFIFO
③listerWatcher:主要负责list和watch

// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
    // name identifies this reflector. By default it will be a file:line if possible.
    name string

    // The name of the type we expect to place in the store. The name
    // will be the stringification of expectedGVK if provided, and the
    // stringification of expectedType otherwise. It is for display
    // only, and should not be used for parsing or comparison.
    expectedTypeName string
    // The type of object we expect to place in the store.
    expectedType reflect.Type
    // The GVK of the object we expect to place in the store if unstructured.
    expectedGVK *schema.GroupVersionKind
    // The destination to sync up with the watch source
    store Store
    // listerWatcher is used to perform lists and watches.
    listerWatcher ListerWatcher
    // period controls timing between one watch ending and
    // the beginning of the next one.
    period       time.Duration
    resyncPeriod time.Duration
    ShouldResync func() bool
    // clock allows tests to manipulate time
    clock clock.Clock
    // lastSyncResourceVersion is the resource version token last
    // observed when doing a sync with the underlying store
    // it is thread safe, but not synchronized with the underlying store
    lastSyncResourceVersion string
    // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchListPageSize is the requested chunk size of initial and resync watch lists.
    // Defaults to pager.PageSize.
    WatchListPageSize int64
}

2.2 Reflector初始化

// k8s.io/client-go/tools/cache/reflector.go
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        period:        time.Second,
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    }
    r.setExpectedType(expectedType)
    return r
}

2.3 ListerWatcher

ListerWatcher 定义了Reflector最核心的两个方法,List和Watch,用于全量获取资源对象以及监控资源对象的变化

// k8s.io/client-go/tools/cache/listwatch.go
type Lister interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
}

type Watcher interface {
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

type ListerWatcher interface {
    Lister
    Watcher
}

2.4 ListWatch

ListWatch实现了 ListerWatcher interface

// k8s.io/client-go/tools/cache/listwatch.go
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

type ListWatch struct {
    ListFunc  ListFunc
    WatchFunc WatchFunc
    // DisableChunking requests no chunking for this list watcher.
    DisableChunking bool
}

2.5 ListWatch的初始化

NewDeploymentInformer初始化Deployment对象的informer中,会初始化ListWatch并定义其ListFunc与WatchFunc

// staging/src/k8s.io/client-go/informers/apps/v1/deployment.go
func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
}

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
    // Reflector#ListerWatcher
        &cache.ListWatch{
            ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1beta1().Deployments(namespace).List(options)
            },
            WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1beta1().Deployments(namespace).Watch(options)
            },
        },
        &appsv1beta1.Deployment{},
        resyncPeriod,
        indexers,
    )
}

2.6 Reflector启动

其主要是循环调用r.ListAndWatch

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

3.Reflector核心处理方法分析

ListAndWatch的主要逻辑分为三大块:

1 List操作(只执行一次):
(1)调用r.listerWatcher.List方法,执行list操作,即获取全量的资源对象;
(2)调用r.syncWith,调用r.store.Replace处理DeltaFIFO;

2 Resync操作(隔一段时间执行一次);
(1)判断是否需要执行Resync操作,即重新同步;
(2)需要则调用r.store.Resync处理DeltaFIFO;

3 Watch操作
(1)调用r.listerWatcher.Watch,开始监听操作;
(2)调用r.watchHandler,处理watch操作返回来的chan;

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    // Explicitly set "0" as resource version - it's fine for the List()
    // to be served from cache and potentially be delayed relative to
    // etcd contents. Reflector framework will catch up via Watch() eventually.
    // 此处ResourceVersion就是k8s资源对象的ResourceVersion字段,每当k8s资源对象发生创建/更新/删除事件时,kubernetes都会更新该资源对象的ResourceVersion,客户端需要持有该resourceVersion,当其他地方修改了resourceVersion后,客户端可以知道该k8s对象发生变化了,要重新获取该k8s资源对象
    options := metav1.ListOptions{ResourceVersion: "0"}
        // 1.List操作(只执行一次)
    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
            // 1.1 调用r.listerWatcher.List方法,执行list操作,即获取全量的资源对象
                return r.listerWatcher.List(opts)
            }))
            if r.WatchListPageSize != 0 {
                pager.PageSize = r.WatchListPageSize
            }
            // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
            list, err = pager.List(context.Background(), options)
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
        }
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        // 资源转换,将list操作获取回来的结果转换为[]runtime.Object结构
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        // 1.2 调用r.syncWith,调用r.store.Replace,该方法使用从api-server里list到的资源对象 替换 DeltaFIFO中资源对象
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

  // 2 Resync操作(隔一段时间执行一次)
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan() // 定时往resyncCh里面写
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh: 
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            // 2.1判断是否需要执行Resync操作,即重新同步
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                // 2.2需要则调用r.store.Resync操作DeltaFIFO
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    // 3 Watch操作
    for {
      // 判断是否需要退出循环
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

    // 3.1调用r.listerWatcher.Watch,开始监听操作
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            //...
        }

    // 3.2 调用r.watchHandler,处理watch操作返回来的chan,操作DeltaFIFO
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                //...
            }
            return nil
        }
    }
}

list操作时,resourceVersion 有三种设置方法:
(1)第一种:不设置,此时会从直接从etcd中读取,此时数据是最新的;
(2)第二种:设置为“0”,此时从apiserver cache中获取;
(3)第三种:设置为指定的resourceVersion,获取resourceVersion大于指定版本的所有资源对象。

3.1 r.syncWith

r.syncWith方法主要是调用r.store.Replace方法,即根据list的结果去替换store里的items

// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}

3.2 r.watchHandler

r.watchHandler主要是处理watch操作返回来的结果,其主要逻辑为循环做以下操作,直至event事件处理完毕:
(1)从watch操作返回来的chan中获取event事件;
(2)chan关闭则退出loop;
(4)区分watch.Added、watch.Modified、watch.Deleted三种类型的event事件,分别调用r.store.Add、r.store.Update、r.store.Delete做处理,具体关于r.store.xxx的方法分析,在后续对DeltaFIFO进行分析时再做具体的分析;
(5)调用r.setLastSyncResourceVersion,为Reflector更新已被处理的最新资源对象的resourceVersion值;

// k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        // 1 从watch操作返回来的chan中获取event事件
        case event, ok := <-w.ResultChan():
          // 2 chan关闭则退出loop
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrs.FromObject(event.Object)
            }
            if r.expectedType != nil {
          // 判断是否期望Type
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
            if r.expectedGVK != nil {
        // 判断group,version,kind是否相同
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }

            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            // 根据watch.Added、watch.Modified、watch.Deleted三种类型的event事件,分别调用r.store.Add、r.store.Update、r.store.Delete,分别将对象类型设置为Add,Update,Delete并添加到DeltaFIFO中
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}

四、DeltaFIFO源码分析

1.DeltaFIFO概述

DeltaFIFO,是一个FIFO是一个先进先出的队列,而Delta包含资源对象数据本身及其变化类型。

Delta的组成:

type Delta struct {
    Type   DeltaType
    Object interface{}
}

type DeltaType string
const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    Sync DeltaType = "Sync"
)

DeltaFIFO的组成:

type DeltaFIFO struct {
    // {obj1key: [Delta{Added,obj},Delta{Updated,obj}]}
    items map[string]Deltas
    // [obj1key, obj2key, ...]
    queue []string
    ...
}

type Deltas []Delta

Reflector往DeltaFIFO里面写入,Controller从DeltaFIFO从里面读取;

一个对象能算出一个唯一的object key,其对应着一个Deltas,所以一个对象对应着一个Deltas。

Delta有4种Type,分别是: Added、Updated、Deleted、Sync。针对同一个对象,可能有多个不同Type的Delta元素在Deltas中,表示对该对象做了不同的操作

2.DeltaFIFO的定义与初始化分析

2.1 DeltaFIFO

DeltaFIFO struct定义了DeltaFIFO的一些属性,下面挑几个重要的分析一下。

items:是个map,key根据对象算出,value为Deltas类型;
queue:存储对象obj的key的队列;

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
  // items是个map,key是obj的key,value为Deltas类型
    items map[string]Deltas
  // 存储对象obj的key的队列;
    queue []string

    // populated is true if the first batch of items inserted by Replace() has been populated
    // or Delete/Add/Update was called first.
    populated bool
    // initialPopulationCount is the number of items inserted by the first call of Replace()
    initialPopulationCount int

    // keyFunc is used to make the key used for queued item
    // insertion and retrieval, and should be deterministic.
    keyFunc KeyFunc

    // knownObjects list keys that are "known", for the
    // purpose of figuring out which items have been deleted
    // when Replace() or Delete() is called.
    knownObjects KeyListerGetter

    // Indication the queue is closed.
    // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    // Currently, not used to gate any of CRED operations.
    closed     bool
    closedLock sync.Mutex

2.2 DeltaFIFO初始化

NewDeltaFIFO初始化了一个items和queue都为空的DeltaFIFO并返回。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      keyFunc,
    // knownObjects = threadSafeMap
        knownObjects: knownObjects,
    }
    f.cond.L = &f.lock
    return f
}

3.DeltaFIFO核心处理方法分析

Reflector的核心处理方法里有调用过几个方法,分别是r.store.Replace、r.store.Add、r.store.Update、r.store.Delete,Reflector里的r.store就是DeltaFIFO,方法就是DeltaFIFO的Replace、Add、Update、Delete方法;

当调用api-server的list接口,获取全量资源对象时,资源对象类型设置为Sync并添加到DeltaFIFO中;例如 封装为Delta{ Type: Sync, Object: xxx}
当调用api-server的watch接口,监听获取资源对象事件时,资源对象类型设置为Add/Update/Delete并添加到DeltaFIFO中

sharedIndexInformer.Run方法中调用NewDeltaFIFO初始化了DeltaFIFO,随后将DeltaFIFO作为参数传入初始化Config;

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    ...
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    cfg := &Config{
        Queue:            fifo,
        ...
    }

    func() {
        ...
        s.controller = New(cfg)
        ...
    }()
    ...
    s.controller.Run(stopCh)

在controller的Run方法中,调用NewReflector初始化Reflector时,将之前的DeltaFIFO传入,赋值给Reflector的store属性,所以Reflector里的r.store其实就是DeltaFIFO,而调用的r.store.Replace、r.store.Add、r.store.Update、r.store.Delete方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

func (c *controller) Run(stopCh <-chan struct{}) {
    ...
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
    // reflector.store = deltafifo
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    ...
}

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        ...
    // 实例化Reflector时,传入的store就是deltafifo
        store:         store,
        ...
    }
    ...
    return r
}

DeltaFIFO核心处理方法主要是DeltaFIFO的Replace、Add、Update、Delete方法

3.1 DeltaFIFO.Add

DeltaFIFO的Add操作,主要逻辑:

func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
  // 调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Added类型的新Delta追加到相应的Deltas中;
    return f.queueActionLocked(Added, obj)
}

3.1.1 DeltaFIFO.queueActionLocked

queueActionLocked负责操作DeltaFIFO中的queue与items,根据对象key构造新的Delta追加到对应的Deltas中,主要逻辑:

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  // 1.计算出对象的key
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
  // 2.构造新的Delta,将新的Delta追加到Deltas末尾
    newDeltas := append(f.items[id], Delta{actionType, obj})
    // 3.调用dedupDeltas将Delta去重
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
      // 4.判断对象的key是否在queue中,不在则添加入queue中
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        // 5.根据对象key更新items中的Deltas
        f.items[id] = newDeltas
        // 6.通知所有的消费者解除阻塞
        f.cond.Broadcast()
    } else {
        delete(f.items, id)
    }
    return nil
}

3.2 DeltaFIFO.Update

DeltaFIFO的Update操作,主要逻辑:

func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

3.3 DeltaFIFO.Delete

DeltaFIFO的Delete操作,主要逻辑:

func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    // f.knownObjects = threadSafeMap
    if f.knownObjects == nil {
        if _, exists := f.items[id]; !exists {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
      // items中不存在对象key,则直接return,跳过处理
            return nil
        }
    } else {
        // We only want to skip the "deletion" action if the object doesn't
        // exist in knownObjects and it doesn't have corresponding item in items.
        // Note that even if there is a "deletion" action in items, we can ignore it,
        // because it will be deduped automatically in "queueActionLocked"
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            // Presumably, this was deleted when a relist happened.
            // Don't provide a second report of the same deletion.
            return nil
        }
    }
  // 调用f.queueActionLocked,操作DeltaFIFO中的queue与items,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中
    return f.queueActionLocked(Deleted, obj)
}

3.4 DeltaFIFO.Replace

DeltaFIFO的Replace操作,主要逻辑:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))

  // 1.list里面是obj,遍历list计算obj的key,循环调用f.queueActionLocked,操作DeltaFIFO中的queue与items,根据对象key构造Sync类型的新Delta追加到相应的Deltas中
    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }
  // f.knownObjects = threadSafeMap,不为nil
    if f.knownObjects == nil {
        // Do deletion detection against our own list.
        queuedDeletions := 0
        for k, oldItem := range f.items {
            if keys.Has(k) {
                continue
            }
            var deletedObj interface{}
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            queuedDeletions++
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }

        if !f.populated {
            f.populated = true
            // While there shouldn't be any queued deletions in the initial
            // population of the queue, it's better to be on the safe side.
                        // replace操作共产生的需要处理的delta数
            f.initialPopulationCount = len(list) + queuedDeletions
        }

        return nil
    }

  // 2.找出indexer中的items有,但传进来Replace方法的list中没有的key,集群里面没有obj,但是indexer中有,需要添加删除事件Delta{Deleted, obj}进行删除
  //调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中
    // Detect deletions not already in the queue.
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        if keys.Has(k) {
            continue
        }

        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        queuedDeletions++
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }

  // 第一次调用Replace方法后,populated值为true
    if !f.populated {
        f.populated = true
        // initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的items数量
        f.initialPopulationCount = len(list) + queuedDeletions
    }

    return nil
}

3.5 DeltaFIFO.Pop

DeltaFIFO的Pop操作,从itmes中弹出Deltas给回调函数PopProcessFunc进行处理,此处回调函数是sharedIndexInformer.HandleDeltas,主要逻辑:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    // 1.循环判断queue的长度是否为0,为0则阻塞住,调用f.cond.Wait(),等待通知(与queueActionLocked方法中的f.cond.Broadcast()相对应,即queue中有对象key则发起通知)
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        // 2.取出queue的队头对象key
        id := f.queue[0]
        // 3.更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去
        f.queue = f.queue[1:]
        // 4.initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        // 5.根据对象key从items中获取对象
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        // 6.把对象从items中删除
        delete(f.items, id)
        // 7.调用PopProcessFunc处理pop出来的对象
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}

3.6 DeltaFIFO.HasSynced

HasSynced指第一次从kube-apiserver中获取到的全量的对象是否全部从DeltaFIFO中pop完成,全部pop完成,说明list回来的对象已经全部同步到了Indexer缓存中去了。

①populated在第一次调用DeltaFIFO的Replace方法中将其设置为true。

②initialPopulationCount的值在第一次调用DeltaFIFO的Replace方法中设置值为加入到items中的Deltas的数量,然后每pop一个Delta,则initialPopulationCount的值减1,pop完成时值则为0。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) HasSynced() bool {
    f.lock.Lock()
    defer f.lock.Unlock()
    return f.populated && f.initialPopulationCount == 0
}

五、sharedProcessor源码分析

1.sharedProcessor

通过 AddEventHandler 函数添加的处理器就会被封装成 processorListener,然后通过 sharedProcessor listeners syncingListeners管理起来

// client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
    listenersStarted bool
    listenersLock    sync.RWMutex
    listeners        []*processorListener
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}

    handler ResourceEventHandler

    pendingNotifications buffer.RingGrowing
    // ...
}

2.sharedIndexInformer.AddEventHandlerWithResyncPeriod

将ResourceEventHandler封装到ProcessListener中,将ProcessListener添加到sharedProcessor listeners和syncingListeners

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

  // 将ResourceEventHandler封装到
    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

    if !s.started {
    // 将ProcessListener添加到sharedProcessor listeners和syncingListeners
        s.processor.addListener(listener)
        return
    }


    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    s.processor.addListener(listener)
    for _, item := range s.indexer.List() {
        listener.add(addNotification{newObj: item})
    }
}
func (p *sharedProcessor) addListener(listener *processorListener) {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.addListenerLocked(listener)
  // 如果sharedProcessor已经启动,则调用listener.run和listener.pop
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}

func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
    p.listeners = append(p.listeners, listener)
    p.syncingListeners = append(p.syncingListeners, listener)
}

3.sharedProcessor.distribute

distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        for _, listener := range p.syncingListeners {
      // 将addNotification、updateNotification、deleteNotification对象写入到p.addCh中
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
      // 将addNotification、updateNotification、deleteNotification对象写入到p.addCh中
            listener.add(obj)
        }
    }
}

func (p *processorListener) add(notification interface{}) {
  // 将addNotification、updateNotification、deleteNotification对象写入到p.addCh中
    p.addCh <- notification
}

4.processorListener.pop

processorListener的pop方法,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
    // 0 nextCh为nil,阻塞
    // 3 将事件写入nextCh
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
    // 1 从p.addCh中取出事件
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
        // 2 nextCh指向p.nextCh,可以写入
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

5.processorListener.run

启动processorListener,就是从p.nextCh获取通知对象notification,根据notification类型调用handler.OnAdd/OnUpdate/onDelete

func (p *processorListener) run() {

    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

6.ResourceEventHandler

// staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
    if r.AddFunc != nil {
        r.AddFunc(obj)
    }
}

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
    if r.UpdateFunc != nil {
        r.UpdateFunc(oldObj, newObj)
    }
}

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
    if r.DeleteFunc != nil {
        r.DeleteFunc(obj)
    }
}

六、Indexer源码分析

Indexer.png

2.Indexer的结构定义分析

2.1 Store

Store接口本身,定义了Add、Update、Delete、List、Get等一些对象增删改查的方法声明,用于操作informer的本地缓存items。

// staging/src/k8s.io/client-go/tools/cache/store.go
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)

    Replace([]interface{}, string) error
    Resync() error
}

2.2 Indexer

Indexer接口继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能)。

// staging/src/k8s.io/client-go/tools/cache/index.go
type Indexer interface {
    Store

    Index(indexName string, obj interface{}) ([]interface{}, error)

    IndexKeys(indexName, indexedValue string) ([]string, error)

    ListIndexFuncValues(indexName string) []string

    ByIndex(indexName, indexedValue string) ([]interface{}, error)

    GetIndexers() Indexers

    AddIndexers(newIndexers Indexers) error
}

2.3 cache

cache是Indexer接口的一个实现,cache包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc

cache会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象

// staging/src/k8s.io/client-go/tools/cache/store.go
type cache struct {
    cacheStorage ThreadSafeStore
    keyFunc KeyFunc
}

2.4 ThreadSafeStore

ThreadSafeStore接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)

    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexKey string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    GetIndexers() Indexers

    AddIndexers(newIndexers Indexers) error
    Resync() error
}

2.5 threadSafeMap

threadSafeMap是ThreadSafeStore接口的一个实现,资源对象都存在items这个map中,key是根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
    lock  sync.RWMutex
  // 资源对象本地存储,key为对obj计算的key,value为obj本身
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

3.Indexer的索引功能

在threadSafeMap中,与索引功能有关的是indexers与indices属性;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}
// staging/src/k8s.io/client-go/tools/cache/index.go
const (
    NamespaceIndex string = "namespace"
)

type IndexFunc func(obj interface{}) ([]string, error)

type Indexers map[string]IndexFunc

type Indices map[string]Index

type Index map[string]sets.String

3.1 Indexers

Indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc计算obj索引键

Indexers: {  
  "索引器名称1": 索引函数1,
  "索引器名称2": 索引函数2,
}

数据示例:

Indexers: {  
  "namespace": MetaNamespaceIndexFunc, // 根据namespace建立索引,MetaNamespaceIndexFunc计算obj的索引值
}

3.2 Indices

Indices包含了所有索引器及其所有的索引数据Index;而Index则包含了索引键以及索引键下的所有对象键的列表;

Indices: {
 "索引器名称1": {  
  "索引键1": ["对象键1", "对象键2"],  
  "索引键2": ["对象键3"],   
 },
 "索引器名称2": {  
  "索引键3": ["对象键1"],  
  "索引键4": ["对象键2", "对象键3"],  
 }
}

数据示例:

pod1 := &v1.Pod {
    ObjectMeta: metav1.ObjectMeta {
        Name: "pod-1",
        Namespace: "default",
    },
    Spec: v1.PodSpec{
        NodeName: "node1",
    }
}

pod2 := &v1.Pod {
    ObjectMeta: metav1.ObjectMeta {
        Name: "pod-2",
        Namespace: "default",
    },
    Spec: v1.PodSpec{
        NodeName: "node2",
    }
}

pod3 := &v1.Pod {
    ObjectMeta: metav1.ObjectMeta {
        Name: "pod-3",
        Namespace: "kube-system",
    },
    Spec: v1.PodSpec{
        NodeName: "node2",
    }
}

往本地缓存indexer中存资源对象时,将其添加到如下索引结构中(将k8s资源对象按命名空间/节点名称等进行分类),通过该索引结构可以直接找到namespace是default的所有pod即pod-1,pod-2,不用去本地缓存indexer中取获取

Indices: {
    "namespace": {                          // 索引器名称namespace
        "default": ["pod-1", "pod-2"],  // 通过命名空间default索引映射到所有本地缓存的key
        "kube-system": ["pod-3"],      // 通过命名空间kube-system索引映射到所有本地缓存的key
    },
    "nodeName": {                           // 索引器名称nodeName
        "node1": ["pod-1"],                 // 使用索引函数得到资源对象的nodeName,
        "node2": ["pod-2", "pod-3"],  
    }
}

当items中添加obj时,MetaNamespaceIndexFunc计算出obj的namespace(default),将obj的key(default/pod-2)
当获取namespace(default)下资源对象时,返回相应key列表,并从items中根据key获取obj

4.Indexer本地缓存

前面对informer-Controller的分析中(代码如下),提到的s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get等方法其实最终就是调用的threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get等;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

4.1 threadSafeMap.Add

调用链:s.indexer.Add --> cache.Add --> threadSafeMap.Add

threadSafeMap.Add方法将key:object存入items中,并调用updateIndices方法更新索引Indices

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
    c.lock.Lock()
    defer c.lock.Unlock()
    oldObject := c.items[key]
    c.items[key] = obj
    c.updateIndices(oldObject, obj, key)
}

也可以看到对threadSafeMap进行操作的方法,基本都会先获取锁,然后方法执行完毕释放锁,所以是并发安全的。

4.3 threadSafeMap.Delete

调用链:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete

threadSafeMap.Delete方法中,先判断本地缓存items中是否存在该key,存在则调用deleteFromIndices删除相关索引,然后删除items中的key及其对应object;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
    c.lock.Lock()
    defer c.lock.Unlock()
    if obj, exists := c.items[key]; exists {
        c.deleteFromIndices(obj, key)
        delete(c.items, key)
    }
}

4.4 threadSafeMap.Get

调用链:s.indexer.Get --> cache.Get --> threadSafeMap.Get

threadSafeMap.Get方法逻辑相对简单,没有索引的相关操作,而是直接从items中通过key获取对应的object并返回;

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    item, exists = c.items[key]
    return item, exists
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容