k8s go client Informer 结合源码分析

背景

k8s apiService 通过 http 对外暴露服务, go client 是 k8s 提供的一套 go语言实现的client lib,封装了通用的流程。
go client中比较重要的就是 Informer机制。本文就结合源码分析下go client informer的流程。

informer是什么

go client的本质就是通过http请求 apiService,但是实际需要考虑很多情况,比如
1:直接请求apiService,会导致apiService压力太大。
2:请求一个资源,要考虑监听资源的变动事件,客户端消费事件的性能。
3:重复开发等等
go client的目的就是提供通用的请求apiService的lib,考虑了 请求 k8s资源,监听k8s资源,性能优化等共性问题。
Informer机制解决了上面的一些问题。
informer机制如下


图1

该图反应了informer的工作机制,下面我们结合代码来解释下这张图。

分析

分析之前先说说怎么使用 go client
看一段代码,然后对代码段做说明。

package main

import (
    "flag"
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "path/filepath"
    "time"

    v1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {

    var namespace, configFile string

    flag.StringVar(&namespace, "n", "memall", "Your Namespace ")
    //这里是设置配置文件为本地的    .kube/config 该文件是k8s服务器的相关连接配置
    flag.StringVar(&configFile, "config", filepath.Join("C:\\Users\\n\\", ".kube", "config"), "Config file ")
    flag.Parse()

    fmt.Printf("Running with kube config file %v\n", configFile)

    config, err := clientcmd.BuildConfigFromFlags("", configFile)
    if err != nil {
        panic(err.Error())
    }

    //创建 clientset, set意思就是集合,client集合,client go为不同的资源准备了不同的http client
    clientset, err := kubernetes.NewForConfig(config)

    //创建 SharedInformerFactory,用于创建 SharedIndexInformer
    informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*1)
        //创建 PodInformer:Pod的SharedIndexInformer通过PodInformer 创建
    podInformer := informerFactory.Core().V1().Pods()
       //设置监听响应函数
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{

        AddFunc: func(obj interface{}) {
            //pod添加回调
            newPod := obj.(*v1.Pod)
            fmt.Printf("new pod added %v\n", newPod.GetName())
        },

        UpdateFunc: func(old, new interface{}) {
            //pod更新回调
            newPod := new.(*v1.Pod)
            fmt.Printf("new pod update %v\n", newPod.GetName())
        },

        DeleteFunc: func(new interface{}) {
             //pod删除回调
            newPod := new.(*v1.Pod)
            fmt.Printf("new pod deleted %v\n", newPod.GetName())
        },
    })

    stopper := make(chan struct{})
    defer close(stopper)
        //启动informer
    informerFactory.Start(stopper)
        //等待缓存同步
    informerFactory.WaitForCacheSync(wait.NeverStop)
    <-stopper
}

以上就是使用的过程
第一步设置k8s集群的连接配置(config)
第二步基于该配置创建SharedInformerFactory 和 podInformer
第三步通过 podInformer.Informer() 方法获得 Pod的 SharedIndexInformer
第四步为SharedIndexInformer设置监听
第五步 informerFactory.Start(stopper) 启动 informer
当启动informer是就进入了图1的流程
下面我们来具体看下。

创建 SharedInformerFactory,SharedInformerFactory用于创建 SharedIndexInformer

    informerFactory := informers.NewSharedInformerFactory(clientset, time.Minute*1)

这段代码创建了SharedInformerFactory,SharedInformerFactory 的作用如同它的名字 SharedIndexInformer 工厂,用来创建和管理 各种k8s资源的SharedIndexInformer,比如 专门用于获取和监听 Pod的资源的 SharedIndexInformer


图2

NewSharedInformerFactory 内部创建的sharedInformerFactory

  
    factory := &sharedInformerFactory{
        client:           client,
        namespace:        v1.NamespaceAll,
        defaultResync:    defaultResync,
               //这里就是图2表示的,informers管理了各种类型的SharedIndexInformer 
        informers:        make(map[reflect.Type]cache.SharedIndexInformer),
        startedInformers: make(map[reflect.Type]bool),
        customResync:     make(map[reflect.Type]time.Duration),
    }

每个SharedIndexInformer都会进入各自的informer流程。这里用 用于Pod 的 SharedIndexInformer 做分析
用于 Pod 的 SharedIndexInformer 和 用于 Deployment 的 SharedIndexInformer 都是 SharedIndexInformer ,但他们请求的api资源是不一样的,所以代码上肯定有区别。
实际上sharedInformerFactory 创建 Pod的SharedIndexInformer 是委托给 PodInformer 的。创建 Deployment 的SharedIndexInformer 是委托给 DeploymentInformer

        //PodInformer 
    podInformer := informerFactory.Core().V1().Pods()
        //DeploymentInformer
       //deploymentInformer := informerFactory.Apps().V1().Deployments()
       //通过podInformer 的 Informer()方法创建SharedIndexInformer 
       podInformer.Informer().AddEventHandler(.....

Pod 的 SharedIndexInformer 就是通过调用podInformer Informer() 方法获取的。

//PodInformer创建SharedIndexInformer 
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
//Informer()方法实际是又回到了sharedInformerFactory,主要是获取已经存在的缓存,
//如果缓存没有会执行 defaultInformer,也就是上面的函数去创建一个SharedIndexInformer 
func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

接着NewFilteredPodInformer方法创建Pod的SharedIndexInformer

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                               //这里专用于请求Pods资源,可以看出 不同 SharedIndexInformer 的区别
                return client.CoreV1().Pods(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}

对比下 创建DeploymentI的SharedIndexInformer

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                            //这里专用于请求Deploymens资源,可以看出 不同于 Pod SharedIndexInformer
                return client.AppsV1().Deployments(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)
            },
        },
        &appsv1.Deployment{},
        resyncPeriod,
        indexers,
    )
}

现在我们有了Pod的SharedIndexInformer,现在可以把它启动了。

informerFactory.Start(wait.NeverStop)

informerFactory的Start内部其实就是循环调用各个SharedIndexInformer.Run方法(启动)

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
                       //启动 SharedIndexInformer
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

可以看到前面主要是创建并初始化 Pod的SharedIndexInformer,然后调用 run()方法进入Informer机制。
在开始讲run()方法之前,先讲下 创建的SharedIndexInformer 都有哪些成员变量。


图3

图3中以SharedIndexInformer为中心,在SharedIndexInformer创建和run()运行过程中会初始化的主要对象。run方法的流程围绕这些对象展开。
先上图然后分析


图4

我们以中间的虚线分割,分上下2个部分分析。

SharedIndexInformer的Run方法会调用Controller 的run

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L443
s.controller.Run(stopCh)

//controller run方法会调用 Reflector 的 Run 方法和 自身的 processLoop 方法

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/controller.go#L153
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)

//processLoop 方法会走图4虚线下面的逻辑
//Reflector 的run方法 会调用 自身的 ListAndWatch

https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/reflector.go#L221
if err := r.ListAndWatch(stopCh); err != nil {...}

ListViewAndWatch内部很长,作用见方法的描述。这里主要看Watch和Watch到事件后的回调 (也是图1中的 1List & Watch逻辑 和 2 Add Object)

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/reflector.go#L415
        w, err := r.listerWatcher.Watch(options) //Watch Pod资源
//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/reflector.go#L429
        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {...}//事件回调

listerWatcher.Watch方法会去请求 apiService的pod资源 (Watch是通过http 1.1 持久连接特性)
watchHandler 会将事件添加到 DeltaFIFO (队列)中。

上面事件添加到 DeltaFIFO中,下面就是去消费了(也是图1中 3Pop Object)。
processLoop 方法会走图4虚线下面的逻辑。
processLoop 看名字就是循环,循环消费 DeltaFIFO。

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/controller.go#L184
    func (c *controller) processLoop() {
    for {

        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))

实际消费的处理函数是 PopProcessFunc 该函数指向 SharedIndexInformer 的 HandleDeltas函数。

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L566
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    if deltas, ok := obj.(Deltas); ok {
        return processDeltas(s, s.indexer, s.transform, deltas)
    }
    return errors.New("object given as Process argument is not Deltas")
}

processDeltas会将事件添加到Indexer。

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/controller.go#L438
        case Sync, Replaced, Added, Updated:
                       //clientState.Get(obj);就是图1  9 get Object for Key的操作,通过key获取到old
            if old, exists, err := clientState.Get(obj); err == nil && exists {
                if err := clientState.Update(obj); err != nil {
                    return err
                }
                handler.OnUpdate(old, obj)
            } else {
                if err := clientState.Add(obj); err != nil {
                    return err
                }
                handler.OnAdd(obj)
            }

hanlder.OnAdd(obj) 实际调用 sharedIndexInformer 的 OnAdd。


//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L577
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
    // Invocation of this function is locked under s.blockDeltas, so it is
    // save to distribute the notification
    s.cacheMutationDetector.AddObject(obj)
        //最后到processor 的 distribute,distribute 方法将事件发送到processorListener的 addCh(Chan)
    s.processor.distribute(addNotification{newObj: obj}, false)
}

distribute 方法将事件发送到processorListener的 addCh(Chan)。 用于下面processorListener 去接收
这里补下前提:processorListener 在启动的时候就会启动每个 Listener(processorListener),主要是 processorListener 的run方法和pop方法,用于接收 addCh 的数据和消费数据。

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L664
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)

Pop方法是接收事件(从addCh接收),run方法是消费事件。
processorListener 是对 AddEventHandler 调用参数 Listener的封装 。

//https://github.com/kubernetes/client-go/blob/2f52a105e63e9ac7affb1a0f533c62883611ba81/tools/cache/shared_informer.go#L545
    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

所以 processorListener 的run方法消费事件后会将事件回调给 客户端,比如最开头的使用例子

       //设置监听响应函数
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{

        AddFunc: func(obj interface{}) {
            //pod添加回调(processorListener 的run方法消费事件后会将事件回调)
            newPod := obj.(*v1.Pod)
            fmt.Printf("new pod added %v\n", newPod.GetName())
        },

最后在提下 processorListener pop方法
case nextCh <- notification: //case 1
case notificationToAdd, ok := <-p.addCh:// case 2
这里2个case
事件接收比消费快时,事件会进入pendingNotifications(环形可增长的buff).
当 pendingNotifications没有数据时 case 1会停止(nextCh = nil // Disable this select case)
当addCh 来数据后,会重新开启case1 ( nextCh = p.nextCh)

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            fmt.Printf(" nextCh in")
            var ok bool
            fmt.Printf(" ReadOne \n")
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                fmt.Printf(" nextCh nil \n ")
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                fmt.Printf(" nextCh change \n")
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                fmt.Printf(" WriteOne \n")
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

以上通过 Pod特例简单分析了下informer机制。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容