APIServer list-watch 的实现

前文讲了informer是在客户端如何工作的,但是在server端如何响应 watch-list请求的。没有介绍。下面就来详细介绍一下。

list-watch操作需要做这么几件事

由组件向apiserver而不是etcd发起watch请求,在组件启动时就进行订阅,告诉apiserver需要知道什么数据发生变化。Watch是一个典型的发布-订阅模式。

组件向apiserver发起的watch请求是可以带条件的,例如,scheduler想要watch的是所有未被调度的Pod,也就是满足Pod.destNode=””的Pod来进行调度操作;而kubelet只关心自己节点上的Pod列表。apiserver向etcd发起的watch是没有条件的,只能知道某个数据发生了变化或创建、删除,但不能过滤具体的值。也就是说对象数据的条件过滤必须在apiserver端而不是etcd端完成。

list是watch失败,数据太过陈旧后的弥补手段,这方面详见 基于list-watch的Kubernetes异步事件处理框架客户端部分。list本身是一个简单的列表操作,和其它apiserver的增删改操作一样,不再多描述细节。

watch的API处理

既然watch本身是一个apiserver提供的http restful的API,那么就按照API的方式去阅读它的代码,按照apiserver的基础功能实现一文所描述,我们来看它的代码,

关键的处理API注册代码pkg/apiserver/api_installer.go

  1. 函数 func (a *APIInstaller) registerResourceHandlers()
    路径 staging/src/k8s.io/apiserver/pkg/endpoints/installer.go 一个rest.Storage对象会被转换为watcher和lister对象
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)

一个rest.Storage对象会被转换为watcher和lister对象,提供list和watch服务的入口是同一个,在API接口中是通过 GET /pods?watch=true 这种方式来区分是list还是watch,API处理函数是由lister和watcher经过ListResource()合体后完成的。

case "LIST": // List all resources of a kind.

handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
        
route := ws.GET(action.Path).To(handler).
    Doc(doc).
    Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
    Operation("list"+namespaced+kind+strings.Title(subresource)+operationSuffix).
        Produces(append(storageMeta.ProducesMIMETypes(action.Verb), allMediaTypes...)...).
Returns(http.StatusOK, "OK", versionedList).
Writes(versionedList)
        
switch {
case isLister && isWatcher:
            
case isWatcher:
            
addParams(route, action.Params)
routes = append(routes, route)
  restfulListResource函数最终去向为ListResource,路径vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go

restfulListResource函数最终去向为ListResource,路径vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go

func restfulListResource(r rest.Lister, rw rest.Watcher, scope handlers.RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
   return func(req *restful.Request, res *restful.Response) {
      handlers.ListResource(r, rw, scope, forceWatch, minRequestTimeout)(res.ResponseWriter, req.Request)
   }
}

函数 func ListResource函数

每次有一个watch的url请求过来,都会调用rw.Watch()创建一个watcher
使用serveWatch()来处理这个请求

func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        // For performance tracking purposes.
        trace := utiltrace.New("List", traceFields(req)...)

        namespace, err := scope.Namer.Namespace(req)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        // Watches for single objects are routed to this function.
        // Treat a name parameter the same as a field selector entry.
        hasName := true
        _, name, err := scope.Namer.Name(req)
        if err != nil {
            hasName = false
        }

        ctx := req.Context()
        ctx = request.WithNamespace(ctx, namespace)

        outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        opts := metainternalversion.ListOptions{}
        if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
            err = errors.NewBadRequest(err.Error())
            scope.err(err, w, req)
            return
        }

        if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
            err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
            scope.err(err, w, req)
            return
        }

        // transform fields
        // TODO: DecodeParametersInto should do this.
        if opts.FieldSelector != nil {
            fn := func(label, value string) (newLabel, newValue string, err error) {
                return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value)
            }
            if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
                // TODO: allow bad request to set field causes based on query parameters
                err = errors.NewBadRequest(err.Error())
                scope.err(err, w, req)
                return
            }
        }

        if hasName {
            // metadata.name is the canonical internal name.
            // SelectionPredicate will notice that this is a request for
            // a single object and optimize the storage query accordingly.
            nameSelector := fields.OneTermEqualSelector("metadata.name", name)

            // Note that fieldSelector setting explicitly the "metadata.name"
            // will result in reaching this branch (as the value of that field
            // is propagated to requestInfo as the name parameter.
            // That said, the allowed field selectors in this branch are:
            // nil, fields.Everything and field selector matching metadata.name
            // for our name.
            if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
                selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name")
                if !ok || name != selectedName {
                    scope.err(errors.NewBadRequest("fieldSelector metadata.name doesn't match requested name"), w, req)
                    return
                }
            } else {
                opts.FieldSelector = nameSelector
            }
        }

        if opts.Watch || forceWatch {
            if rw == nil {
                scope.err(errors.NewMethodNotSupported(scope.Resource.GroupResource(), "watch"), w, req)
                return
            }
            // TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
            timeout := time.Duration(0)
            if opts.TimeoutSeconds != nil {
                timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
            }
            if timeout == 0 && minRequestTimeout > 0 {
                timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
            }
            klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
            ctx, cancel := context.WithTimeout(ctx, timeout)
            defer cancel()
            watcher, err := rw.Watch(ctx, &opts)
            if err != nil {
                scope.err(err, w, req)
                return
            }
            requestInfo, _ := request.RequestInfoFrom(ctx)
            metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
                serveWatch(watcher, scope, outputMediaType, req, w, timeout)
            })
            return
        }

        // Log only long List requests (ignore Watch).
        defer trace.LogIfLong(500 * time.Millisecond)
        trace.Step("About to List from storage")
        result, err := r.List(ctx, &opts)
        if err != nil {
            scope.err(err, w, req)
            return
        }
        trace.Step("Listing from storage done")

        transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
        trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
    }
}

func (s *WatchServer) ServeHTTP()函数


for {
    select {
    case event, ok := <-ch: 
 
        obj := event.Object
        s.Fixup(obj)
        if err := s.EmbeddedEncoder.Encode(obj, buf); 
 
        unknown.Raw = buf.Bytes()
        event.Object = &unknown
 
        outEvent := &metav1.WatchEvent{}
        *internalEvent = metav1.InternalEvent(event)
        err := metav1.Convert_versioned_InternalEvent_to_versioned_Event(internalEvent, outEvent, nil)
            
        if err := e.Encode(outEvent); 
        if len(ch) == 0 {
            flusher.Flush()
        }
 
        buf.Reset()
}

watcher(watch.Interface)对象是被staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go 对象创建出来的。watch是所有通用的函数,根据label和field过滤

func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
    label := labels.Everything()
    if options != nil && options.LabelSelector != nil {
        label = options.LabelSelector
    }
    field := fields.Everything()
    if options != nil && options.FieldSelector != nil {
        field = options.FieldSelector
    }
    predicate := e.PredicateFunc(label, field)
 
    resourceVersion := ""
    if options != nil {
        resourceVersion = options.ResourceVersion
        predicate.IncludeUninitialized = options.IncludeUninitialized
    }
    return e.WatchPredicate(ctx, predicate, resourceVersion)
}

Cacher

结构体 cahcer

  • storage提供了增删改查watch list接口,主要面向etcd
  • watchCache有限容量的滑动窗口
  • watchers接口体包括所有请求watch的map
type Cacher struct {
 
    // Underlying storage.Interface.
    storage Interface
 
    // Expected type of objects in the underlying cache.
    objectType reflect.Type
 
    // "sliding window" of recent changes of objects and the current state.
    watchCache *watchCache
    reflector  *cache.Reflector
 
    watcherIdx int
    watchers   indexedWatchers
}

2.2 函数 func (c *Cacher) Watch
newCacheWatcher生成一个watcher
addwatcher并将watcher插入到cacher.watchers中
// Implements storage.Interface.

func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
    watchRV, err := ParseWatchResourceVersion(resourceVersion)
    if err != nil {
        return nil, err
    }
 
    c.ready.wait()
 
    initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
 
    chanSize := 10
 
    forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
    watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
 
    c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
    c.watcherIdx++
    return watcher, nil

2.3 函数 func newCacheWatcher
newCacheWatcher生成一个watcher
process异步处理

func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
    watcher := &cacheWatcher{
        input:     make(chan *watchCacheEvent, chanSize),
        result:    make(chan watch.Event, chanSize),
        done:      make(chan struct{}),
        filter:    filter,
        stopped:   false,
        forget:    forget,
        versioner: versioner,
    }
    go watcher.process(initEvents, resourceVersion)
    return watcher
}

2.3 函数 func (c *cacheWatcher) process
process函數主要读取input调用sendWatchCacheEvent,继续sendWatchCacheEnven函数

func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
    for _, event := range initEvents {
        c.sendWatchCacheEvent(event)
    }
    processingTime := time.Since(startTime)
    if processingTime > initProcessThreshold {
        objType := "<null>"
        if len(initEvents) > 0 {
            objType = reflect.TypeOf(initEvents[0].Object).String()
        }
        glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
    }
 
    defer close(c.result)
    defer c.Stop()
    for {
        event, ok := <-c.input
        if !ok {
            return
        }
        // only send events newer than resourceVersion
        if event.ResourceVersion > resourceVersion {
            c.sendWatchCacheEvent(event)
        }
    }
}

2.4 函数func (c *cacheWatcher) sendWatchCacheEvent
watchCacheEvent进行Filter,发送到cacher.result channel中
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!

func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
    curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
    oldObjPasses := false
    if event.PrevObject != nil {
        oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
    }
    if !curObjPasses && !oldObjPasses {
        // Watcher is not interested in that object.
        return
    }
 
    var watchEvent watch.Event
    switch {
    case curObjPasses && !oldObjPasses:
        watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
    case curObjPasses && oldObjPasses:
        watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
    case !curObjPasses && oldObjPasses:
        // return a delete event with the previous object content, but with the event's resource version
        oldObj := event.PrevObject.DeepCopyObject()
        if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
            utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
        }
        watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
    }
 
    select {
    case <-c.done:
        return
    default:
    }
 
    select {
    case c.result <- watchEvent:
    case <-c.done:
    }
}
 ```

三. 实现接口部分
pkg/registry/core/pod/storage/storage.go
3.1 NewPodStorage函数
 PodStorage.Pod.Store封装了对etcd的操作;
```
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
 
   store := &genericregistry.Store{
      NewFunc:                  func() runtime.Object { return &api.Pod{} },
      NewListFunc:              func() runtime.Object { return &api.PodList{} },
      PredicateFunc:            pod.MatchPod,
      DefaultQualifiedResource: api.Resource("pods"),
 
      CreateStrategy:      pod.Strategy,
      UpdateStrategy:      pod.Strategy,
      DeleteStrategy:      pod.Strategy,
      ReturnDeletedObject: true,
 
      TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
   }
   options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
   if err := store.CompleteWithOptions(options); err != nil {
      panic(err) // TODO: Propagate error up
   }
 
   statusStore := *store
   statusStore.UpdateStrategy = pod.StatusStrategy
 
   return PodStorage{
      Pod:         &REST{store, proxyTransport},
      Binding:     &BindingREST{store: store},
      Eviction:    newEvictionStorage(store, podDisruptionBudgetClient),
      Status:      &StatusREST{store: &statusStore},
      Log:         &podrest.LogREST{Store: store, KubeletConn: k},
      Proxy:       &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
      Exec:        &podrest.ExecREST{Store: store, KubeletConn: k},
      Attach:      &podrest.AttachREST{Store: store, KubeletConn: k},
      PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
   }
}
```
3.2 CompleWithOptions函数
路径:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
主要函数是GetRESTOptions
```
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    if err != nil {
        return err
    }
 
    if e.Storage == nil {
        e.Storage, e.DestroyFunc = opts.Decorator(
            opts.StorageConfig,
            e.NewFunc(),
            prefix,
            keyFunc,
            e.NewListFunc,
            attrFunc,
            triggerFunc,
        )
    }
 
    return nil
}

3.3 GetRESTOptinons函数
路径:/vendor/k8s.io/apiserver/pkg/server/options/etcd.go
调用genericregistry.StorageWithCacher创建cache store

func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }
 
    ret := generic.RESTOptions{
        StorageConfig:           storageConfig,
        Decorator:               generic.UndecoratedStorage,
        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection: f.Options.EnableGarbageCollection,
        ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        cacheSize, ok := sizes[resource]
        if !ok {
            cacheSize = f.Options.DefaultWatchCacheSize
        }
        ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
    }
 
    return ret, nil
}

3.4 StorageWithCacher函数
路径: vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go
根据配置创建cacher
NewRawStorage根据配置类型,具体是创建etcd v2或者v3 client
// Creates a cacher based given storageConfig.

func StorageWithCacher(capacity int) generic.StorageDecorator {
    return func(
        storageConfig *storagebackend.Config,
        objectType runtime.Object,
        resourcePrefix string,
        keyFunc func(obj runtime.Object) (string, error),
        newListFunc func() runtime.Object,
        getAttrsFunc storage.AttrFunc,
        triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
 
        s, d := generic.NewRawStorage(storageConfig)
        if capacity == 0 {
            glog.V(5).Infof("Storage caching is disabled for %T", objectType)
            return s, d
        }
        
        cacherConfig := storage.CacherConfig{
            CacheCapacity:        capacity,
            Storage:              s,
            Versioner:            etcdstorage.APIObjectVersioner{},
            Type:                 objectType,
            ResourcePrefix:       resourcePrefix,
            KeyFunc:              keyFunc,
            NewListFunc:          newListFunc,
            GetAttrsFunc:         getAttrsFunc,
            TriggerPublisherFunc: triggerFunc,
            Codec:                storageConfig.Codec,
        }
        cacher := storage.NewCacherFromConfig(cacherConfig)
        destroyFunc := func() {
            cacher.Stop()
            d()
        }
 
        RegisterStorageCleanup(destroyFunc)
        return cacher, destroyFunc
    }
}

3.5 NewCacherFromConfig函数
路径: vendor/k8s.io/apiserver/pkg/storage/cacher.go
创建cacher服务于list-watch创建watchCache对象和cacheListerWatcher对象,cacheListWatcher对象是ListerWatcher接口实现,实现了List()和Watch()方法
构建Cacher对象
(1) watchCache是一个结构体,用来存储apiserver从etcd watch到的对象
(2) watchers是一个indexedWatchers结构体,当kubelet,scheduler需要watch某类资源时,他们会向kube-apiserver发起watch请求,apiserver就会生成一个cacheWatcher,cacheWatcher负责将watch的资源从apiserver发送到kubelet, scheduler
(3) Reflector结构体数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();listerWatcher包装了Storage,主要是将watch到的对象存到watchCache中;

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}
        (4) incoming channel接收watchCacheEvent的100个空间的channel

协程cacher.dispatchEvents,watchCache将incoming channel接收watchCacheEvent添加到watchers的inputChan中

func (c *Cacher) dispatchEvents() {
    for {
        select {
        case event, ok := <-c.incoming:
            if !ok {
                return
            }
            c.dispatchEvent(&event)
        case <-c.stopCh:
            return
        }
    }
}
 
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
    triggerValues, supported := c.triggerValues(event)
 
    for _, watcher := range c.watchers.allWatchers {
        watcher.add(event, c.dispatchTimeoutBudget)
    }
    if supported {
 
        for _, triggerValue := range triggerValues {
            for _, watcher := range c.watchers.valueWatchers[triggerValue] {
                watcher.add(event, c.dispatchTimeoutBudget)
            }
        }
    } else {
        for _, watchers := range c.watchers.valueWatchers {
            for _, watcher := range watchers {
                watcher.add(event, c.dispatchTimeoutBudget)
            }
        }
    }
}

协程cacher.startCaching

func NewCacherFromConfig(config CacherConfig) *Cacher {
    watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
    listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
    reflectorName := "storage/cacher.go:" + config.ResourcePrefix
 
    stopCh := make(chan struct{})
    cacher := &Cacher{
        
    }
    watchCache.SetOnEvent(cacher.processEvent)
    go cacher.dispatchEvents()
 
    cacher.stopWg.Add(1)
    go func() {
        defer cacher.stopWg.Done()
        wait.Until(
            func() {
                if !cacher.isStopped() {
                    cacher.startCaching(stopCh)
                }
            }, time.Second, stopCh,
        )
    }()
    return cacher
}

3.6 startCaching函数
路径: vendor/k8s.io/apiserver/pkg/storage/cacher.go
主要函数reflecrtor.ListAndWatch函数,把远端etcd数据同步到本地的方法,存在watchCache

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
    successfulList := false
    c.watchCache.SetOnReplace(func() {
        successfulList = true
        c.ready.set(true)
    })
    defer func() {
        if successfulList {
            c.ready.set(false)
        }
    }()
 
    c.terminateAllWatchers()
    if err := c.reflector.ListAndWatch(stopChannel); err != nil {
        glog.Errorf("unexpected ListAndWatch error: %v", err)
    }
}

3.7 ListAndWatch函数
路径: vendor/k8s.io/client-go/tools/cache/reflector.go
首先list所有条目,根据版本进行watch
调用listerWatcher.List方法获得
调用listerWatcher.Watch进行操作

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    list, err := r.listerWatcher.List(options)
    
    r.metrics.listDuration.Observe(time.Since(start).Seconds())
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    }
    resourceVersion = listMetaInterface.GetResourceVersion()
 
    for {
        w, err := r.listerWatcher.Watch(options)
 
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            
            return nil
        }
    }
}

3.8 newCacherListerWatcher函数
路径: vendor/k8s.io/apiserver/pkg/storage/cacher.go
storage接口包含list watch接口

func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
    return &cacherListerWatcher{
        storage:        storage,
        resourcePrefix: resourcePrefix,
        newListFunc:    newListFunc,
    }
}
 
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
    list := lw.newListFunc()
    if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
        return nil, err
    }
    return list, nil
}
 
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
    return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}

3.9 NewRawStorage函数
路径: vendor/k8s.io/apiserver/pkg/registry/gereric/registry/storage_factory.go
往回找到,最终去向是etcd,应该是k8s1.6版本从v2更新至v3

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
    s, d, err := factory.Create(*config)
    
    return s, d
}
 
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case storagebackend.StorageTypeETCD2:
        return newETCD2Storage(c)
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
        
        return newETCD3Storage(c)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}

3.10 newETCD3Storage函数
路径: vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go
对就是etcdv3 client实现了list watch接口

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    
    client, err := clientv3.New(cfg)
 
    if c.Quorum {
        return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
    }
    return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
 
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error 
 
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursi
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容