一、client-go informer简介
从图中可以看出,k8s informer主要包括以下几个部分:
1.Reflector
(1)Reflector调用kube-apiserver的list接口获取资源对象列表,然后调用DeltaFIFO的Replace方法将object包装成Sync/Deleted类型的Delta丢进DeltaFIFO中;
(2)Reflector调用kube-apiserver的watch接口获取资源对象的变化,然后调用DeltaFIFO的Add/Update/Delete方法将object包装成Added/Updated/Deleted类型的Delta丢到DeltaFIFO中;
2.DeltaFIFO
DeltaFIFO中存储着一个map和一个queue;
(1)其中queue可以看成是一个先进先出队列,一个object进入DeltaFIFO中,会判断queue中是否已经存在该object key,不存在则添加到队尾;
(2)map即map[object key]Deltas,是object key和Deltas的映射,Deltas是Delta的切片类型,Delta中存储着DeltaType和object;
DeltaType有4种,分别是Added、Updated、Deleted、Sync
3.Controller
Controller从DeltaFIFO的queue中pop一个object key出来,并从DeltaFIFO的map中获取其对应的 Deltas出来进行处理,遍历Deltas,根据object的变化类型更新Indexer本地缓存,并通知Processor相关对象有变化事件发生:
(1)如果DeltaType是Deleted,则调用Indexer的Delete方法,将Indexer本地缓存中的object删除,并构造deleteNotification,通知Processor做处理;
(2)如果DeltaType是Added/Updated/Sync,调用Indexer的Get方法从Indexer本地缓存中获取该对象,存在则调用Indexer的Update方法来更新Indexer缓存中的该对象,随后构造updateNotification,通知Processor做处理;如果Indexer中不存在该对象,则调用Indexer的Add方法将该对象存入本地缓存中,并构造addNotification,通知Processor做处理;
4.Processor
Processor根据Controller的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)将object key加入工作队列workqueue给custom controller处理。
5.Indexer
Indexer是资源对象的本地缓存,依赖于threadSafeMap的items,indexers,indices
items:items是一个map,本地存储所有资源对象,key为资源对象的namespace/name组成,value为资源对象本身
indexers,indices:索引
6.ResourceEventHandler
调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)将object key加入工作队列workqueue给custom controller处理
7.informer使用示例代码
func main() {
// 自定义与kube-apiserver通信的config配置
kubeconfig := "~/.kube/config"
//①构建与kube-apiserver通信的config配置;
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
klog.Fatalf("Failed to create config: %v", err)
}
// ②初始化与apiserver通信的clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
// ③初始化shared informer factory
factory := informers.NewSharedInformerFactory(clientset, 30 * time.Second)
// 每个资源对象都有一个informer,例如PodInformer,并有2个接口方法Informer(初始化informer), Lister(创建podLister)
podInformer := factory.Core().V1().Pods()
// ④初始化podInformer并保存在factory
informer := podInformer.Informer()
// ⑤注册informer的自定义ResourceEventHandler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
stopCh := make(chan struct{})
defer close(stopCh)
// ⑥启动shared informer factory,从而启动factory里面每个informer从api-server去list和watch,
factory.Start(stopCh)
// ⑦等待所有启动的 Informer 的缓存indexer被同步,因为每个k8s资源对象都有一个对应的Informer,再调podLister从缓存indexer中获取k8s资源对象
factory.WaitForCacheSync(stopCh)
// ⑧调用Lister接口方法创建lister,从informer中的indexer本地缓存中获取资源对象;
podLister := podInformer.Lister()
podList, err := podLister.List(labels.Everything())
for _, pod := range podList {
klog.Infof("podName: %s", pod.Name)
}
<-stopCh
}
二、informer初始化与启动
1.SharedInformerFactory的初始化
// staging/src/k8s.io/client-go/informers/factory.go
type sharedInformerFactory struct {
// 连接k8s的client: clientset
client kubernetes.Interface
namespace string
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
// key为obj资源类型,value为对应资源的sharedIndexInformer
informers map[reflect.Type]cache.SharedIndexInformer
// 记录已经启动的informer
startedInformers map[reflect.Type]bool
}
2.informer初始化
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// Indexer中资源对象的本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver的请求压力
indexer Indexer
// Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor
controller Controller
// Processor根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化;
processor *sharedProcessor
// 从api-sever获取资源对象,保存到DeltaFIFO中
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
3.启动sharedInformerFactory
sharedInformerFactory.Start方法启动里面所有informer。
// staging/src/k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
4.sharedIndexInformer.Run(核心)
sharedIndexInformer.Run用于启动informer,主要逻辑为:
(1)调用NewDeltaFIFO,初始化DeltaFIFO;
(2)构建Config结构体,这里留意下Process属性,赋值了s.HandleDeltas,后面会分析到该方法;
(3)调用New,利用Config结构体来初始化controller;
(4)调用s.processor.run,启动processor;
(5)调用s.controller.Run,启动controller;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 1.初始化DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 2.构建Config结构体
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// pop回调函数
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 3.初始化controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 4.启动processor,启动listener.run与调用listener.pop
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 5.启动controller,初始化并启动Reflector,调用c.processLoop,开始controller的核心处理
s.controller.Run(stopCh)
}
4.1 controller启动
4.1.1 controller.Run
①初始化并启动Reflector
②c.processLoop是controller的核心处理,从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1.初始化Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// 1.启动Reflector
wg.StartWithChannel(stopCh, r.Run)
// 2.c.processLoop是controller的核心处理,从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生
wait.Until(c.processLoop, time.Second, stopCh)
}
4.1.2controller.processLoop
processLoop循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),然后用c.config.Process
方法来做处理Deltas,c.config.Process就是s.HandleDeltas
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // PopProcessFunc(c.config.Process)表示将c.config.Process转换为PopProcessFunc类型
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
4.1.3 sharedIndexInformer.HandleDeltas
c.config.Process即为s.HandleDeltas方法
HandleDeltas方法的主要逻辑:
(1)循环遍历Deltas,拿到单个Delta;
(2)判断Delta的类型;
(3)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并调用s.processor.distribute方法;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并调用s.processor.distribute方法;
(4)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并调用s.processor.distribute方法;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
三、Reflector源码分析
1.Reflector概述
Reflector的两个核心操作:
1 List&Watch
Reflector从kube-apiserver中list&watch资源对象,然后将对象的变化包装成Delta并将其存到DeltaFIFO中
2.将对象的变化包装成Delta然后扔进DeltaFIFO
Reflector首先通过List操作获取全量的资源对象数据,调用DeltaFIFO的Replace方法全量插入DeltaFIFO,然后后续通过Watch操作根据资源对象的变化类型相应的调用DeltaFIFO的Add、Update、Delete方法,将对象及其变化插入到DeltaFIFO中
2.Reflector初始化与启动分析
2.1 Reflector结构体
先来看到Reflector结构体,这里重点看到以下属性:
①expectedType:放到Store中(即DeltaFIFO中)的对象类型;
②store:store会赋值为DeltaFIFO
③listerWatcher:主要负责list和watch
// k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
}
2.2 Reflector初始化
// k8s.io/client-go/tools/cache/reflector.go
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType)
return r
}
2.3 ListerWatcher
ListerWatcher 定义了Reflector最核心的两个方法,List和Watch,用于全量获取资源对象以及监控资源对象的变化
// k8s.io/client-go/tools/cache/listwatch.go
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
type ListerWatcher interface {
Lister
Watcher
}
2.4 ListWatch
ListWatch实现了 ListerWatcher interface
// k8s.io/client-go/tools/cache/listwatch.go
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
2.5 ListWatch的初始化
在NewDeploymentInformer
初始化Deployment对象的informer中,会初始化ListWatch并定义其ListFunc与WatchFunc
// staging/src/k8s.io/client-go/informers/apps/v1/deployment.go
func NewDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredDeploymentInformer(client, namespace, resyncPeriod, indexers, nil)
}
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// Reflector#ListerWatcher
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1beta1().Deployments(namespace).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1beta1().Deployments(namespace).Watch(options)
},
},
&appsv1beta1.Deployment{},
resyncPeriod,
indexers,
)
}
2.6 Reflector启动
其主要是循环调用r.ListAndWatch
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
3.Reflector核心处理方法分析
ListAndWatch的主要逻辑分为三大块:
1 List操作(只执行一次):
(1)调用r.listerWatcher.List方法,执行list操作,即获取全量的资源对象;
(2)调用r.syncWith,调用r.store.Replace处理DeltaFIFO;
2 Resync操作(隔一段时间执行一次);
(1)判断是否需要执行Resync操作,即重新同步;
(2)需要则调用r.store.Resync处理DeltaFIFO;
3 Watch操作
(1)调用r.listerWatcher.Watch,开始监听操作;
(2)调用r.watchHandler,处理watch操作返回来的chan;
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
// 此处ResourceVersion就是k8s资源对象的ResourceVersion字段,每当k8s资源对象发生创建/更新/删除事件时,kubernetes都会更新该资源对象的ResourceVersion,客户端需要持有该resourceVersion,当其他地方修改了resourceVersion后,客户端可以知道该k8s对象发生变化了,要重新获取该k8s资源对象
options := metav1.ListOptions{ResourceVersion: "0"}
// 1.List操作(只执行一次)
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// 1.1 调用r.listerWatcher.List方法,执行list操作,即获取全量的资源对象
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 资源转换,将list操作获取回来的结果转换为[]runtime.Object结构
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 1.2 调用r.syncWith,调用r.store.Replace,该方法使用从api-server里list到的资源对象 替换 DeltaFIFO中资源对象
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
// 2 Resync操作(隔一段时间执行一次)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan() // 定时往resyncCh里面写
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// 2.1判断是否需要执行Resync操作,即重新同步
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
// 2.2需要则调用r.store.Resync操作DeltaFIFO
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 3 Watch操作
for {
// 判断是否需要退出循环
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// 3.1调用r.listerWatcher.Watch,开始监听操作
w, err := r.listerWatcher.Watch(options)
if err != nil {
//...
}
// 3.2 调用r.watchHandler,处理watch操作返回来的chan,操作DeltaFIFO
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
//...
}
return nil
}
}
}
list操作时,resourceVersion 有三种设置方法:
(1)第一种:不设置,此时会从直接从etcd中读取,此时数据是最新的;
(2)第二种:设置为“0”,此时从apiserver cache中获取;
(3)第三种:设置为指定的resourceVersion,获取resourceVersion大于指定版本的所有资源对象。
3.1 r.syncWith
r.syncWith方法主要是调用r.store.Replace方法,即根据list的结果去替换store里的items
// k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
3.2 r.watchHandler
r.watchHandler主要是处理watch操作返回来的结果,其主要逻辑为循环做以下操作,直至event事件处理完毕:
(1)从watch操作返回来的chan中获取event事件;
(2)chan关闭则退出loop;
(4)区分watch.Added、watch.Modified、watch.Deleted三种类型的event事件,分别调用r.store.Add、r.store.Update、r.store.Delete做处理,具体关于r.store.xxx的方法分析,在后续对DeltaFIFO进行分析时再做具体的分析;
(5)调用r.setLastSyncResourceVersion,为Reflector更新已被处理的最新资源对象的resourceVersion值;
// k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
// 1 从watch操作返回来的chan中获取event事件
case event, ok := <-w.ResultChan():
// 2 chan关闭则退出loop
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if r.expectedType != nil {
// 判断是否期望Type
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
// 判断group,version,kind是否相同
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
// 根据watch.Added、watch.Modified、watch.Deleted三种类型的event事件,分别调用r.store.Add、r.store.Update、r.store.Delete,分别将对象类型设置为Add,Update,Delete并添加到DeltaFIFO中
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
四、DeltaFIFO源码分析
1.DeltaFIFO概述
DeltaFIFO,是一个FIFO是一个先进先出的队列,而Delta包含资源对象数据本身及其变化类型。
Delta的组成:
type Delta struct {
Type DeltaType
Object interface{}
}
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Sync DeltaType = "Sync"
)
DeltaFIFO的组成:
type DeltaFIFO struct {
// {obj1key: [Delta{Added,obj},Delta{Updated,obj}]}
items map[string]Deltas
// [obj1key, obj2key, ...]
queue []string
...
}
type Deltas []Delta
Reflector往DeltaFIFO里面写入,Controller从DeltaFIFO从里面读取;
一个对象能算出一个唯一的object key,其对应着一个Deltas,所以一个对象对应着一个Deltas。
Delta有4种Type,分别是: Added、Updated、Deleted、Sync。针对同一个对象,可能有多个不同Type的Delta元素在Deltas中,表示对该对象做了不同的操作
2.DeltaFIFO的定义与初始化分析
2.1 DeltaFIFO
DeltaFIFO struct定义了DeltaFIFO的一些属性,下面挑几个重要的分析一下。
items:是个map,key根据对象算出,value为Deltas类型;
queue:存储对象obj的key的队列;
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// items是个map,key是obj的key,value为Deltas类型
items map[string]Deltas
// 存储对象obj的key的队列;
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
2.2 DeltaFIFO初始化
NewDeltaFIFO初始化了一个items和queue都为空的DeltaFIFO并返回。
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
// knownObjects = threadSafeMap
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
3.DeltaFIFO核心处理方法分析
Reflector的核心处理方法里有调用过几个方法,分别是r.store.Replace、r.store.Add、r.store.Update、r.store.Delete,Reflector里的r.store就是DeltaFIFO,方法就是DeltaFIFO的Replace、Add、Update、Delete方法;
当调用api-server的list接口,获取全量资源对象时,资源对象类型设置为Sync并添加到DeltaFIFO中;例如 封装为Delta{ Type: Sync, Object: xxx}
当调用api-server的watch接口,监听获取资源对象事件时,资源对象类型设置为Add/Update/Delete并添加到DeltaFIFO中
sharedIndexInformer.Run方法中调用NewDeltaFIFO初始化了DeltaFIFO,随后将DeltaFIFO作为参数传入初始化Config;
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
...
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
...
}
func() {
...
s.controller = New(cfg)
...
}()
...
s.controller.Run(stopCh)
在controller的Run方法中,调用NewReflector初始化Reflector时,将之前的DeltaFIFO传入,赋值给Reflector的store属性,所以Reflector里的r.store其实就是DeltaFIFO,而调用的r.store.Replace、r.store.Add、r.store.Update、r.store.Delete方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。
func (c *controller) Run(stopCh <-chan struct{}) {
...
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
// reflector.store = deltafifo
c.config.Queue,
c.config.FullResyncPeriod,
)
...
}
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
...
// 实例化Reflector时,传入的store就是deltafifo
store: store,
...
}
...
return r
}
DeltaFIFO核心处理方法主要是DeltaFIFO的Replace、Add、Update、Delete方法
3.1 DeltaFIFO.Add
DeltaFIFO的Add操作,主要逻辑:
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// 调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Added类型的新Delta追加到相应的Deltas中;
return f.queueActionLocked(Added, obj)
}
3.1.1 DeltaFIFO.queueActionLocked
queueActionLocked负责操作DeltaFIFO中的queue与items,根据对象key构造新的Delta追加到对应的Deltas中,主要逻辑:
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 1.计算出对象的key
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 2.构造新的Delta,将新的Delta追加到Deltas末尾
newDeltas := append(f.items[id], Delta{actionType, obj})
// 3.调用dedupDeltas将Delta去重
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
// 4.判断对象的key是否在queue中,不在则添加入queue中
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
// 5.根据对象key更新items中的Deltas
f.items[id] = newDeltas
// 6.通知所有的消费者解除阻塞
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}
3.2 DeltaFIFO.Update
DeltaFIFO的Update操作,主要逻辑:
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
3.3 DeltaFIFO.Delete
DeltaFIFO的Delete操作,主要逻辑:
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// f.knownObjects = threadSafeMap
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
// items中不存在对象key,则直接return,跳过处理
return nil
}
} else {
// We only want to skip the "deletion" action if the object doesn't
// exist in knownObjects and it doesn't have corresponding item in items.
// Note that even if there is a "deletion" action in items, we can ignore it,
// because it will be deduped automatically in "queueActionLocked"
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
}
// 调用f.queueActionLocked,操作DeltaFIFO中的queue与items,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中
return f.queueActionLocked(Deleted, obj)
}
3.4 DeltaFIFO.Replace
DeltaFIFO的Replace操作,主要逻辑:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 1.list里面是obj,遍历list计算obj的key,循环调用f.queueActionLocked,操作DeltaFIFO中的queue与items,根据对象key构造Sync类型的新Delta追加到相应的Deltas中
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// f.knownObjects = threadSafeMap,不为nil
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
// replace操作共产生的需要处理的delta数
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// 2.找出indexer中的items有,但传进来Replace方法的list中没有的key,集群里面没有obj,但是indexer中有,需要添加删除事件Delta{Deleted, obj}进行删除
//调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 第一次调用Replace方法后,populated值为true
if !f.populated {
f.populated = true
// initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的items数量
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
3.5 DeltaFIFO.Pop
DeltaFIFO的Pop操作,从itmes中弹出Deltas给回调函数PopProcessFunc进行处理,此处回调函数是sharedIndexInformer.HandleDeltas,主要逻辑:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
// 1.循环判断queue的长度是否为0,为0则阻塞住,调用f.cond.Wait(),等待通知(与queueActionLocked方法中的f.cond.Broadcast()相对应,即queue中有对象key则发起通知)
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 2.取出queue的队头对象key
id := f.queue[0]
// 3.更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去
f.queue = f.queue[1:]
// 4.initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 5.根据对象key从items中获取对象
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
// 6.把对象从items中删除
delete(f.items, id)
// 7.调用PopProcessFunc处理pop出来的对象
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
3.6 DeltaFIFO.HasSynced
HasSynced指第一次从kube-apiserver中获取到的全量的对象是否全部从DeltaFIFO中pop完成,全部pop完成,说明list回来的对象已经全部同步到了Indexer缓存中去了。
①populated在第一次调用DeltaFIFO的Replace方法中将其设置为true。
②initialPopulationCount的值在第一次调用DeltaFIFO的Replace方法中设置值为加入到items中的Deltas的数量,然后每pop一个Delta,则initialPopulationCount的值减1,pop完成时值则为0。
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}
五、sharedProcessor源码分析
1.sharedProcessor
通过 AddEventHandler 函数添加的处理器就会被封装成 processorListener,然后通过 sharedProcessor listeners syncingListeners管理起来
// client-go/tools/cache/shared_informer.go
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
pendingNotifications buffer.RingGrowing
// ...
}
2.sharedIndexInformer.AddEventHandlerWithResyncPeriod
将ResourceEventHandler封装到ProcessListener中,将ProcessListener添加到sharedProcessor listeners和syncingListeners
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 将ResourceEventHandler封装到
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started {
// 将ProcessListener添加到sharedProcessor listeners和syncingListeners
s.processor.addListener(listener)
return
}
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
// 如果sharedProcessor已经启动,则调用listener.run和listener.pop
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
3.sharedProcessor.distribute
distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
// 将addNotification、updateNotification、deleteNotification对象写入到p.addCh中
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
// 将addNotification、updateNotification、deleteNotification对象写入到p.addCh中
listener.add(obj)
}
}
}
func (p *processorListener) add(notification interface{}) {
// 将addNotification、updateNotification、deleteNotification对象写入到p.addCh中
p.addCh <- notification
}
4.processorListener.pop
processorListener的pop方法,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
// 0 nextCh为nil,阻塞
// 3 将事件写入nextCh
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 1 从p.addCh中取出事件
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
// 2 nextCh指向p.nextCh,可以写入
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
5.processorListener.run
启动processorListener,就是从p.nextCh获取通知对象notification,根据notification类型调用handler.OnAdd/OnUpdate/onDelete
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
6.ResourceEventHandler
// staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
六、Indexer源码分析
2.Indexer的结构定义分析
2.1 Store
Store接口本身,定义了Add、Update、Delete、List、Get等一些对象增删改查的方法声明,用于操作informer的本地缓存items。
// staging/src/k8s.io/client-go/tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
2.2 Indexer
Indexer接口继承了一个Store接口(实现本地缓存),以及包含几个index索引相关的方法声明(实现索引功能)。
// staging/src/k8s.io/client-go/tools/cache/index.go
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
2.3 cache
cache是Indexer接口的一个实现,cache包含一个ThreadSafeStore接口的实现,以及一个计算object key的函数KeyFunc
cache会根据keyFunc生成某个obj对象对应的一个唯一key, 然后调用ThreadSafeStore接口中的方法来操作本地缓存中的对象
// staging/src/k8s.io/client-go/tools/cache/store.go
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
2.4 ThreadSafeStore
ThreadSafeStore接口包含了操作本地缓存的增删改查方法以及索引功能的相关方法
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
2.5 threadSafeMap
threadSafeMap是ThreadSafeStore接口的一个实现,资源对象都存在items这个map中,key是根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
// 资源对象本地存储,key为对obj计算的key,value为obj本身
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
3.Indexer的索引功能
在threadSafeMap中,与索引功能有关的是indexers与indices属性;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
// staging/src/k8s.io/client-go/tools/cache/index.go
const (
NamespaceIndex string = "namespace"
)
type IndexFunc func(obj interface{}) ([]string, error)
type Indexers map[string]IndexFunc
type Indices map[string]Index
type Index map[string]sets.String
3.1 Indexers
Indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc计算obj索引键
Indexers: {
"索引器名称1": 索引函数1,
"索引器名称2": 索引函数2,
}
数据示例:
Indexers: {
"namespace": MetaNamespaceIndexFunc, // 根据namespace建立索引,MetaNamespaceIndexFunc计算obj的索引值
}
3.2 Indices
Indices包含了所有索引器及其所有的索引数据Index;而Index则包含了索引键以及索引键下的所有对象键的列表;
Indices: {
"索引器名称1": {
"索引键1": ["对象键1", "对象键2"],
"索引键2": ["对象键3"],
},
"索引器名称2": {
"索引键3": ["对象键1"],
"索引键4": ["对象键2", "对象键3"],
}
}
数据示例:
pod1 := &v1.Pod {
ObjectMeta: metav1.ObjectMeta {
Name: "pod-1",
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node1",
}
}
pod2 := &v1.Pod {
ObjectMeta: metav1.ObjectMeta {
Name: "pod-2",
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node2",
}
}
pod3 := &v1.Pod {
ObjectMeta: metav1.ObjectMeta {
Name: "pod-3",
Namespace: "kube-system",
},
Spec: v1.PodSpec{
NodeName: "node2",
}
}
往本地缓存indexer中存资源对象时,将其添加到如下索引结构中(将k8s资源对象按命名空间/节点名称等进行分类),通过该索引结构可以直接找到namespace是default的所有pod即pod-1,pod-2,不用去本地缓存indexer中取获取
Indices: {
"namespace": { // 索引器名称namespace
"default": ["pod-1", "pod-2"], // 通过命名空间default索引映射到所有本地缓存的key
"kube-system": ["pod-3"], // 通过命名空间kube-system索引映射到所有本地缓存的key
},
"nodeName": { // 索引器名称nodeName
"node1": ["pod-1"], // 使用索引函数得到资源对象的nodeName,
"node2": ["pod-2", "pod-3"],
}
}
当items中添加obj时,MetaNamespaceIndexFunc计算出obj的namespace(default),将obj的key(default/pod-2)
当获取namespace(default)下资源对象时,返回相应key列表,并从items中根据key获取obj
4.Indexer本地缓存
前面对informer-Controller的分析中(代码如下),提到的s.indexer.Add、s.indexer.Update、s.indexer.Delete、s.indexer.Get等方法其实最终就是调用的threadSafeMap.Add、threadSafeMap.Update、threadSafeMap.Delete、threadSafeMap.Get等;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
4.1 threadSafeMap.Add
调用链:s.indexer.Add --> cache.Add --> threadSafeMap.Add
threadSafeMap.Add方法将key:object
存入items中,并调用updateIndices
方法更新索引Indices
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
也可以看到对threadSafeMap进行操作的方法,基本都会先获取锁,然后方法执行完毕释放锁,所以是并发安全的。
4.3 threadSafeMap.Delete
调用链:s.indexer.Delete --> cache.Delete --> threadSafeMap.Delete
threadSafeMap.Delete方法中,先判断本地缓存items中是否存在该key,存在则调用deleteFromIndices
删除相关索引,然后删除items中的key及其对应object;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}
4.4 threadSafeMap.Get
调用链:s.indexer.Get --> cache.Get --> threadSafeMap.Get
threadSafeMap.Get方法逻辑相对简单,没有索引的相关操作,而是直接从items中通过key获取对应的object并返回;
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}