[k8s源码分析][client-go] informer之controller和shared_informer(1)

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

源码位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)

[k8s源码分析][client-go] informer之store和index[k8s源码分析][client-go] informer之store和index[k8s源码分析][client-go] informer之reflector 的基础上进行分析, 接下来将会分析如何生成一个informer, 并且用户如何添加自己的逻辑, 与用户层越来越接近了.

2. 接口与类

这里先介绍后面需要用到的几个接口与结构体

architecture.png

2.1 ResourceEventHandler

// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

相信对这三个函数比较熟悉, 用户可以在这里定义自己的逻辑.

2.2 processorListener

type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}
    // 一个自定义处理数据的handler
    handler ResourceEventHandler
    // 一个环形的buffer 存着那些还没有被分发的notifications
    pendingNotifications buffer.RingGrowing
    // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
    requestedResyncPeriod time.Duration
    // informer's overall resync check period.
    resyncPeriod time.Duration
    // 下次要resync的时候
    nextResync time.Time
    resyncLock sync.Mutex
}

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
    ret := &processorListener{
        nextCh:                make(chan interface{}),
        addCh:                 make(chan interface{}),
        handler:               handler,
        pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
        requestedResyncPeriod: requestedResyncPeriod,
        resyncPeriod:          resyncPeriod,
    }
    ret.determineNextResync(now)
    return ret
}
func (p *processorListener) determineNextResync(now time.Time) {
    p.resyncLock.Lock()
    defer p.resyncLock.Unlock()
    // now加上该listener的resyncPeriod就是下次要resync的时间
    p.nextResync = now.Add(p.resyncPeriod)
}

关于buffer.NewRingGrowing是一个无限的循环数组, 无限的意思是当你想要在增加一个元素, 发现整个数组满了, 此时会进行扩容, 如果一直扩容, 会被OOM杀死.

关于shouldResyncsetResyncPeriod比较简单就不多说了. 这里说一下三个比较重要的方法add, poprun方法.

add

add方法是由上层程序调用的, 也就是往该listener发送了一个新的notification. 相当于生产者.

func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}
pop 和 run
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop
    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            // notification从缓冲区pendingNotifications中读 然后传递给nextCh
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { 
                // 如果notification还没有初始化 则进行初始化notification和nextCh
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                // 直接往pendingNotifications中写
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
func (p *processorListener) run() {
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            // 从nextCh读取并调用该listener的handler进行处理
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

poprun属于消费者, 消费从add方法中过来的notification, 但是为了防止处理速度(调用handler)跟不上生产速度, 设置了一个缓冲区pendingNotifications, 把从add中过来的notification先加入到pendingNotifications, 然后从pendingNotifications读取一个notification后, 将notification通过nextCh这个channel来进而传递给消费者run.

work_flow.png

2.3 sharedProcessor

type sharedProcessor struct {
    // 判断listeners有没有启动
    listenersStarted bool
    listenersLock    sync.RWMutex
    // 所有的processorListener
    listeners        []*processorListener
    // 所有的需要sync的processorListener 动态变化
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

这里sharedProcessor就是管理着所有的processorListener, 简单一点理解就是当拿到一个数据, 然后可以分发给所有的listeners.

resyncCheckPeriodChanged
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    for _, listener := range p.listeners {
        // 根据listener自己要求的requestedResyncPeriod和resyncCheckPeriod来决定该listener真正的resyncPeriod
        resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
        listener.setResyncPeriod(resyncPeriod)
    }
}
// 1. 如果desired或check其中一个是0 则返回0
// 2. 返回max(desired, check)
func determineResyncPeriod(desired, check time.Duration) time.Duration {
    if desired == 0 {
        return desired
    }
    if check == 0 {
        klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
        return 0
    }
    if desired < check {
        klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
        return check
    }
    return desired
}

resyncCheckPeriodChanged的作用是根据resyncCheckPeriod会重新生成一下每个listener自己的resyncPeriod.
对于每一个listener:
1. 如果自己要求的requestedResyncPeriod0或被要求的resyncCheckPeriod其中一个是0, 则返回0.
2. 则返回两个其中最大的一个.

shouldResync
func (p *sharedProcessor) shouldResync() bool {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()
    p.syncingListeners = []*processorListener{}
    resyncNeeded := false
    now := p.clock.Now()
    for _, listener := range p.listeners {
        if listener.shouldResync(now) {
            resyncNeeded = true
            p.syncingListeners = append(p.syncingListeners, listener)
            listener.determineNextResync(now)
        }
    }
    return resyncNeeded
}

可以看到该方法会重新生成syncingListeners, 遍历所有的listeners, 判断哪个已经到了resync时间, 如果到了就加入到syncingListeners中, 并且它的下一次resync的时间.

如果所有的listeners都没有到resync时间, 那该sharedProcessor对象的shouldResync会返回false. 否则会返回true.

run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        // 以goroutine的方式启动所有的listeners监听
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    // 等待有信号告知退出
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    // 关闭所有listener的addCh channel
    for _, listener := range p.listeners {
        // 通知pop()停止 pop()会告诉run()停止
        close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    }
    // 等待所有的pop()和run()方法退出
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

run方法主要是启动所有的listener进行监听.

其余方法
func (p *sharedProcessor) addListener(listener *processorListener) {
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()

    p.addListenerLocked(listener)
    // 如果已经启动了
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}

func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
    p.listeners = append(p.listeners, listener)
    p.syncingListeners = append(p.syncingListeners, listener)
}

// 分发信息
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    if sync {
        // 如果是sync操作 只需要分发给那些resync时间到了的listener即可
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        // 如果不是sync操作 则通知所有的listeners
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

addListener: 表示增加一个processorListener, 如果sharedProcessor已经启动run方法了(listenersStarted=true), 那么就启动该listenerrunpop监控.

distribute: 分发消息, 也就是说sharedProcessor收到一个obj, 然后把该obj分发给它的listeners, 那么每个listeners都可以收到这个obj.

informer整体

整个informer体系在k8s代码中占有重要一环, 理解informer可以更好理解k8s的工作机制.

informer.png

1. [k8s源码分析][client-go] informer之store和index
2. [k8s源码分析][client-go] informer之delta_fifo
3. [k8s源码分析][client-go] informer之reflector
4. [k8s源码分析][client-go] informer之controller和shared_informer(1)
5. [k8s源码分析][client-go] informer之controller和shared_informer(2)
6. [k8s源码分析][client-go] informer之SharedInformerFactory

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351