[k8s源码分析][client-go] informer之SharedInformerFactory

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)

1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)

在前面分析的基础上, 本文将分析SharedInformerFactory, 这个是封装了NewSharedIndexInformer方法, 利用工厂模式来生成用户需要的informer类型, 比如PodInformer, NodeInformer等等. 在整个k8s的源码体系中, informer占有非常重要的位置, 几乎在各个组件中都有使用.

本文会涉及两个包client-go/informersclient-go/listers.

2. 例子

这是一个非常常规的例子, 也是非常惯用的用法.

package main

import (
    "fmt"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "time"
)

func main()  {
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)
    // 生成一个SharedInformerFactory
    factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
    // 生成一个PodInformer
    podInformer := factory.Core().V1().Pods()
    // 获得一个cache.SharedIndexInformer 单例模式
    sharedInformer := podInformer.Informer()

    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
        UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
        DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
    })

    stopCh := make(chan struct{})

    // 第一种方式
    // 可以这样启动  也可以按照下面的方式启动
    // go sharedInformer.Run(stopCh)
    // time.Sleep(2 * time.Second)

    // 第二种方式
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())

    for _, p := range pods {
        fmt.Printf("list pods: %v\n", p.Name)
    }
    <- stopCh
}

当前集群中的状态:

[root@master kubectl]# ./kubectl get nodes
NAME          STATUS   ROLES    AGE     VERSION
172.21.0.12   Ready    <none>   5d22h   v0.0.0-master+$Format:%h$
172.21.0.16   Ready    <none>   5d22h   v0.0.0-master+$Format:%h$
[root@master kubectl]# ./kubectl get pods --all-namespaces
NAMESPACE   NAME            READY   STATUS    RESTARTS   AGE
default     test            1/1     Running   0          4d4h
default     test-schduler   1/1     Running   0          4d4h
[root@master kubectl]# 

运行结果

[root@worker tming]# go run main.go 
add: test
add: test-schduler
list pods: test
list pods: test-schduler
update: test-schduler
update: test
update: test
update: test-schduler

可以看到用户可以利用NewSharedInformerFactory来创建用户需要的Informer, 比如例子中创建了一个PodInformer对象podInformer.

3. 源码分析

接下来将以上面的例子为主线来进行分析.

3.1 接口

// client-go/informers/internalinterfaces/factory_interfaces.go
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
type SharedInformerFactory interface {
    Start(stopCh <-chan struct{})
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
type TweakListOptionsFunc func(*v1.ListOptions)

// client-go/informers/factory.go
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
    ...
    Core() core.Interface
    ...
}

这里不分析那么多的Interface, 因为都是大同小异, 所以只需要看core.Interface即可.

// client-go/informers/core/interface.go
type Interface interface {
    // V1 provides access to shared informers for resources in V1.
    V1() v1.Interface
}
// client-go/informers/core/v1/interface.go
type Interface interface {
    ...
    // Nodes returns a NodeInformer.
    Nodes() NodeInformer
    ...
    // Pods returns a PodInformer.
    Pods() PodInformer
    ...
}
// Pods returns a PodInformer.
func (v *version) Pods() PodInformer {
    return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

然后来看podInformer

podInformer
// 该接口有两个方法
// Informer 生成一个 cache.SharedIndexInformer对象
// Lister   生成一个 v1.PodLister对象
type PodInformer interface {
    Informer() cache.SharedIndexInformer
    Lister() v1.PodLister
}
// 接口的实现类
type podInformer struct {
    factory          internalinterfaces.SharedInformerFactory
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    namespace        string
}

该接口有两个方法
Informer()生成一个cache.SharedIndexInformer对象, 获得该对象后用户可以添加自己的ResourceEventHandler.
Lister() 生成一个v1.PodLister对象, 用户可以列出想要获取的元素.

Informer方法
// client-go/informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                // api-server的接口
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                // api-server的接口
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

Informer方法调用了工厂方法, 从工厂中获取.
工厂的逻辑是如果没有就用传入的方法生成一个, 如果有就直接方法
所以defaultInformer是用于第一次生成cache.SharedIndexInformer对象的.

indexer.png

这里的defaultInformer用到的是namespace这样的一个indexer, 那最终的结果就会如上图所示, 对于后面要说到Lister()有用, 因为该Lister()就是从本地缓存中取数据, 而不是直接去服务器端(k8s)上获得数据.

// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

1. 如果之前已经用newFunc生成过, 则直接返回对应的SharedIndexInformer
2. 如果没有生成过 则用newFunc生成并且保存到informers中(map结构) 然后返回
注意: 所以同一个sharedInformerFactory返回的podInformer一定是同一个(单例模式)

既然都已经获得了cache.SharedIndexInformer, 那就可以调用cache.SharedIndexInformer的方法比如AddEventHandler增加用户逻辑等等. 在 [k8s源码分析][client-go] informer之controller和shared_informer(2) 已经有详细分析.

Lister方法

看看该Lister()是如何实现的

// client-go/informers/core/v1/pod.go
func (f *podInformer) Lister() v1.PodLister {
    return v1.NewPodLister(f.Informer().GetIndexer())
}

可以看到返回的是v1.PodLister对象, 用一个v1.NewPodLister方法返回. 可以猜得到v1.PodLister是一个接口, v1.NewPodLister返回一个该接口的实现类.

另外f.Informer()从上面分析过了, 获得一个cache.SharedIndexInformer对象, 而且是单例方法, 只要是同一个factory, 调用Informer最终返回的是同一个cache.SharedIndexInformer对象, 那么调用GetIndexer就是获得本地缓存, 也就是上面画的图, 可想而知, 该PodLister就是一个从本地缓存获取信息的Lister.

接下来看一下v1.PodLister的具体定义.

// client-go/listers/core/v1/pod.go
type PodLister interface {
    List(selector labels.Selector) (ret []*v1.Pod, err error)
    Pods(namespace string) PodNamespaceLister
    PodListerExpansion
}
type podLister struct {
    indexer cache.Indexer
}
func NewPodLister(indexer cache.Indexer) PodLister {
    return &podLister{indexer: indexer}
}

方法就不看了, 就是从indexer中获取元素, 如果加上了Selector, 就再加上点过滤.

3.2 factory方法

最后回到工厂类(client-go/informers/factory.go)的方法, Run方法和WaitForCacheSync方法.

// client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

启动所有注册的informers, 那什么时候注册的呢?
factory.Core().V1().Pods().Informer()的时候如果没有的时候会生成一个并放到f.informers中.

// client-go/informers/factory.go
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    // 收集所有已经启动的informers
    informers := func() map[reflect.Type]cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()

        informers := map[reflect.Type]cache.SharedIndexInformer{}
        for informerType, informer := range f.informers {
            if f.startedInformers[informerType] {
                informers[informerType] = informer
            }
        }
        return informers
    }()

    res := map[reflect.Type]bool{}
    for informType, informer := range informers {
        // 等待同步完成
        res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    }
    return res
}
// client-go/tools/cache/shared_informer.go
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    err := wait.PollUntil(syncedPollPeriod,
        func() (bool, error) {
            for _, syncFunc := range cacheSyncs {
                if !syncFunc() {
                    return false, nil
                }
            }
            return true, nil
        },
        stopCh)
    if err != nil {
        klog.V(2).Infof("stop requested")
        return false
    }

    klog.V(4).Infof("caches populated")
    return true
}

该方法是等待所有已经启动的informers完成同步. 因为不等到同步完成的时候, 本地缓存中是没有数据的, 如果直接就运行逻辑代码, 有些调用list方法就会获取不到, 因为服务器端是有数据的, 所以就会产生一定的偏差, 因此一般都是等到服务器端数据同步到本地缓存完了才开始运行用户自己的逻辑.

这也是为什么上面例子的第一种写法是需要等待2秒钟才调用list方法, 因为如果不sleep, 有可能获得的是空的.

informer整体

整个informer体系在k8s代码中占有重要一环, 理解informer可以更好理解k8s的工作机制.

informer.png

1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352