从源码看k8s特性AtomicFIFO

背景

1.36后k8s引入了一个新特性AtomicFIFO,默认为true,让client获得单个事件而不是多个事件从而原子的处理

源码

tools/cache/controller.go

创建FIFO队列
func newQueueFIFO(clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue {
    判断是否开启InOrderInformers特性
    if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
        options := RealFIFOOptions{
            KeyFunction:     MetaNamespaceKeyFunc,
            Transformer:     transform,
            Identifier:      identifier,
            MetricsProvider: metricsProvider,
        }
        判断是否开启AtomicFIFO特性
        if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) {
            options.AtomicEvents = true
        } else {
            options.KnownObjects = clientState
        }
        创建RealFIFO队列
        return NewRealFIFOWithOptions(options)
    }
    ...
}

tools/cache/the_real_fifo.go中


func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
    ...
    f := &RealFIFO{
        ...
        emitAtomicEvents:      opts.AtomicEvents,
        ...
    }
    ...
    return f
}



替换队列中的对象
func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error {
    ...
    if f.emitAtomicEvents {
        err = f.addReplaceToItemsLocked(newItems, resourceVersion)
    } else {
        err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf,
            func(obj DeletedFinalStateUnknown) error {
                return f.addToItems_locked(Deleted, true, obj)
            },
            func(obj interface{}) error {
                return f.addToItems_locked(Replaced, false, obj)
            })
    }
    ...
}

加锁,添加ReplacedAll事件到队列
func (f *RealFIFO) addReplaceToItemsLocked(objs []interface{}, resourceVersion string) error {
    ...
    构建ReplacedAllInfo对象
    info := ReplacedAllInfo{
        ResourceVersion: resourceVersion,
        Objects:         objs,
    }
    添加事件到队列
    f.items = append(f.items, Delta{
        Type:   ReplacedAll,
        Object: info,
    })
    通知等待的goroutine
    f.cond.Broadcast()
    ...
}


已加锁,添加事件到队列
func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error {
    ...
    添加对象到队列
    f.items = append(f.items, Delta{
        Type:   deltaActionType,
        Object: obj,
    })
    通知等待的goroutine
    f.cond.Broadcast()
    ...
}

执行Resync操作
func (f *RealFIFO) Resync() error {
    ...
    如果是AtomicFIFO,添加ResyncAll事件到队列
    if f.emitAtomicEvents {
        return f.addResyncToItemsLocked()
    }
    ...
    逐个事件执行Sync操作
    knownKeys := f.knownObjects.ListKeys()
    for _, knownKey := range knownKeys {
        ...
        retErr := f.addToItems_locked(Sync, true, knownObj)
        if retErr != nil {
            return fmt.Errorf("couldn't queue object: %w", err)
        }
        ...
    }
}


添加一个SyncAll事件到队列
func (f *RealFIFO) addResyncToItemsLocked() error {
    添加SyncAll事件到队列
    f.items = append(f.items, Delta{
        Type:   SyncAll,
        Object: SyncAllInfo{},
    })
    通知等待的goroutine
    f.cond.Broadcast()

    return nil
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容