Kubernetes 源码解析 - Informer 的工作原理

上篇扒了 HPA 的源码,但是没深入细节,今天往细节深入。

为什么要有 Informer?

Kubernetes 中的持久化数据保存在 etcd中,各个组件并不会直接访问 etcd,而是通过 api-server暴露的 RESTful 接口对集群进行访问和控制。

资源的控制器(图中右侧灰色的部分)读取数据也并不会直接从 api-server 中获取资源信息(这样会增加 api-server 的压力),而是从其“本地缓存”中读取。这个“本地缓存”只是表象的存在,加上缓存的同步逻辑就是今天要是说的Informer(灰色区域中的第一个蓝色块)所提供的功能。

从图中可以看到 Informer 的几个组件:

  • Reflector:与 api-server交互,监听资源的变更。
  • Delta FIFO Queue:增量的 FIFO 队列,保存 Reflector 监听到的资源变更(简单的封装)。
  • Indexer:Informer 的本地缓存,FIFO 队列中的数据根据不同的变更类型,在该缓存中进行操作。
    • Local Store:

上篇 提到了水平自动伸缩的控制器HorizontalController,其构造方法就需要提供 Informer

//pkg/controller/podautoscaler/horizontal.go

type HorizontalController struct {
    scaleNamespacer scaleclient.ScalesGetter
    hpaNamespacer   autoscalingclient.HorizontalPodAutoscalersGetter
    mapper          apimeta.RESTMapper
    replicaCalc   *ReplicaCalculator
    eventRecorder record.EventRecorder
    downscaleStabilisationWindow time.Duration
    hpaLister       autoscalinglisters.HorizontalPodAutoscalerLister
    hpaListerSynced cache.InformerSynced
    podLister       corelisters.PodLister
    podListerSynced cache.InformerSynced
    queue workqueue.RateLimitingInterface
    recommendations map[string][]timestampedRecommendation
}

func NewHorizontalController(
    evtNamespacer v1core.EventsGetter,
    scaleNamespacer scaleclient.ScalesGetter,
    hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
    mapper apimeta.RESTMapper,
    metricsClient metricsclient.MetricsClient,
    //从HorizontalPodAutoscalerInformer 获取hpa 实例信息
    hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
    //从PodInformer 中获取 pod 信息
    podInformer coreinformers.PodInformer,
    resyncPeriod time.Duration,
    downscaleStabilisationWindow time.Duration,
    tolerance float64,
    cpuInitializationPeriod,
    delayOfInitialReadinessStatus time.Duration,

) *HorizontalController {
    ......
        hpaInformer.Informer().AddEventHandlerWithResyncPeriod( //添加事件处理器
        cache.ResourceEventHandlerFuncs{
            AddFunc:    hpaController.enqueueHPA,
            UpdateFunc: hpaController.updateHPA,
            DeleteFunc: hpaController.deleteHPA,
        },
        resyncPeriod,
    )
    ......
}

type HorizontalPodAutoscalerInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.HorizontalPodAutoscalerLister
}

HorizontalPodAutoscalerInformer的实例化方法中就出现了今天的正主cache.NewSharedIndexInformer()

//staging/src/k8s.io/client-go/informers/autoscaling/v1/horizontalpodautoscaler.go
func NewFilteredHorizontalPodAutoscalerInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
           //用于 list 和 watch api-server 中的资源。比如用来创建 Reflector
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                //使用 HPA API 获取 HPA资源
                return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                //使用 HPA API 监控 HPA资源
                return client.AutoscalingV1().HorizontalPodAutoscalers(namespace).Watch(options)
            },
        },
        &autoscalingv1.HorizontalPodAutoscaler{},
        resyncPeriod,
        indexers,
    )
}

初始化

Informer

//staging/src/k8s.io/client-go/tools/cache/index.go
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)

实例化 Indexers cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go
// ListerWatcher 用于 list 和watch api-server 上的资源
//runtime.Object要监控的资源的运行时对象
//time.Duration同步的间隔时间
//Indexers 提供不同资源的索引数据的信息查询方法,如 namespace => MetaNamespaceIndexFunc
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), //初始化 Indexer
        listerWatcher:                   lw,
        objectType:                      objType,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

Indexer

Indexer提供了本地缓存的实现:计算 key 和对数据进行控制(通过调用ThreadSafeStore的接口)

type Indexer interface {
    Store
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(indexName string) []string
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
}

Indexer 的创建

//staging/src/k8s.io/client-go/tools/cache/store.go
//keyFunc:key 的生成规则
//indexers:提供了索引资源的不同信息的访问方法,如用于查询命名空间的 namespace => MetaNamespaceIndexFunc
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        keyFunc:      keyFunc,
    }
}

ThreadSafeStore

ThreadSafeStore提供了对存储的并发访问接口

注意事项:不能修改Get或List返回的任何内容,因为它不仅会破坏线程安全,还会破坏索引功能。

//staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        items:    map[string]interface{}{},
        indexers: indexers,
        indices:  indices,
    }
}
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{} //key => value
    indexers Indexers //value 的信息的访问方法
    indices Indices //索引
}

Reflector

Reflector通过ListerWatcher(API)与api-server交互,对资源进行监控。将资源实例的创建、更新、删除等时间封装后保存在Informer的FIFO 队列中。

//staging/src/k8s.io/client-go/tools/cache/reflector.go
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store, //FIFO队列
        period:        time.Second,
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    }
    r.setExpectedType(expectedType)
    return r
}

添加同步事件监听器

通过sharedIndexInformer#AddEventHandlerWithResyncPeriod()注册事件监听器。

以前面的 HorizontalController为例,创建 informer 的时候添加了三个处理方法:AddFuncUpdateFuncDeleteFunc。这三个方法的实现是将对应的元素的 key(固定格式 namespace/name)从workequeue中进行入队、出队的操作。(资源控制器监听了该 workqueue

运行

controller-manager

在通过InformerFactory创建Informer完成后,都会将新建的Informer加入到InformerFactory的一个map中。

controller-manager在完成所有的控制器(各种Controller,包括 CRD)后,会调用InformerFactory#Start()来启动InformerFactorymap中的所有Informer(调用Informer#Run()方法)

sharedIndexInformer#Run()

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
      //创建一个增量的 FIFO队列:DeltaFIFO
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,

        Process: s.HandleDeltas,
    }
     //启动前的初始化,创建 Controller
    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)
     //退出时的状态清理
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true // Don't want any new listeners
    }()
    //实行控制逻辑
    s.controller.Run(stopCh)
}

controller#Run()

//staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    //创建一个 Reflector,用于从 api-server list 和 watch 资源
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock
      //为 controller 指定 Reflector
    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()
    var wg wait.Group
    defer wg.Wait()
     //执行Reflector#Run():会启动一个goroutine开始监控资源,将 watch 到的数据写入到queue(FIFO 队列)中
    wg.StartWithChannel(stopCh, r.Run)
      //持续从 queue(FIFO 队列) 获取数据并进行处理,处理的逻辑在sharedIndexInformer#HandleDeltas()
    wait.Until(c.processLoop, time.Second, stopCh)
}

sharedIndexInformer#HandleDeltas()

//staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // from oldest to newest
    for _, d := range obj.(Deltas) { //循环处理 FIFO 队列中取出的资源实例
        switch d.Type {
        case Sync, Added, Updated: //同步(后面详细解读)、新增、更新事件
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil { //如果 indexer 中已经存在,更掉用 update 方法进行更新
                    return err
                }
                //更新成功后发送“更新”通知:包含了新、旧资源实例
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                  //如果 indexer 中没有该资源实例,则放入 indexer 中
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                //添加成功后,发送“新增”通知:包含了新加的资源实例
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted: //删除事件
            if err := s.indexer.Delete(d.Object); err != nil {//从 indexer 中删除
                return err
            }
            //删除成功后,发送“删除通知”:包含了删除的资源实例
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

总结

Informer 的实现不算复杂,却在 Kubernetes 中很常见,每种资源的控制也都通过 Informer 来获取api-server的资源实例的变更。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容