背景
1.36后k8s引入了一个新特性AtomicFIFO,默认为true,让client获得单个事件而不是多个事件从而原子的处理
源码
tools/cache/controller.go
创建FIFO队列
func newQueueFIFO(clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue {
判断是否开启InOrderInformers特性
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
options := RealFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
Transformer: transform,
Identifier: identifier,
MetricsProvider: metricsProvider,
}
判断是否开启AtomicFIFO特性
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.AtomicFIFO) {
options.AtomicEvents = true
} else {
options.KnownObjects = clientState
}
创建RealFIFO队列
return NewRealFIFOWithOptions(options)
}
...
}
tools/cache/the_real_fifo.go中
func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
...
f := &RealFIFO{
...
emitAtomicEvents: opts.AtomicEvents,
...
}
...
return f
}
替换队列中的对象
func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error {
...
if f.emitAtomicEvents {
err = f.addReplaceToItemsLocked(newItems, resourceVersion)
} else {
err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf,
func(obj DeletedFinalStateUnknown) error {
return f.addToItems_locked(Deleted, true, obj)
},
func(obj interface{}) error {
return f.addToItems_locked(Replaced, false, obj)
})
}
...
}
加锁,添加ReplacedAll事件到队列
func (f *RealFIFO) addReplaceToItemsLocked(objs []interface{}, resourceVersion string) error {
...
构建ReplacedAllInfo对象
info := ReplacedAllInfo{
ResourceVersion: resourceVersion,
Objects: objs,
}
添加事件到队列
f.items = append(f.items, Delta{
Type: ReplacedAll,
Object: info,
})
通知等待的goroutine
f.cond.Broadcast()
...
}
已加锁,添加事件到队列
func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error {
...
添加对象到队列
f.items = append(f.items, Delta{
Type: deltaActionType,
Object: obj,
})
通知等待的goroutine
f.cond.Broadcast()
...
}
执行Resync操作
func (f *RealFIFO) Resync() error {
...
如果是AtomicFIFO,添加ResyncAll事件到队列
if f.emitAtomicEvents {
return f.addResyncToItemsLocked()
}
...
逐个事件执行Sync操作
knownKeys := f.knownObjects.ListKeys()
for _, knownKey := range knownKeys {
...
retErr := f.addToItems_locked(Sync, true, knownObj)
if retErr != nil {
return fmt.Errorf("couldn't queue object: %w", err)
}
...
}
}
添加一个SyncAll事件到队列
func (f *RealFIFO) addResyncToItemsLocked() error {
添加SyncAll事件到队列
f.items = append(f.items, Delta{
Type: SyncAll,
Object: SyncAllInfo{},
})
通知等待的goroutine
f.cond.Broadcast()
return nil
}