前文讲了informer是在客户端如何工作的,但是在server端如何响应 watch-list请求的。没有介绍。下面就来详细介绍一下。
list-watch操作需要做这么几件事
由组件向apiserver而不是etcd发起watch请求,在组件启动时就进行订阅,告诉apiserver需要知道什么数据发生变化。Watch是一个典型的发布-订阅模式。
组件向apiserver发起的watch请求是可以带条件的,例如,scheduler想要watch的是所有未被调度的Pod,也就是满足Pod.destNode=””的Pod来进行调度操作;而kubelet只关心自己节点上的Pod列表。apiserver向etcd发起的watch是没有条件的,只能知道某个数据发生了变化或创建、删除,但不能过滤具体的值。也就是说对象数据的条件过滤必须在apiserver端而不是etcd端完成。
list是watch失败,数据太过陈旧后的弥补手段,这方面详见 基于list-watch的Kubernetes异步事件处理框架客户端部分。list本身是一个简单的列表操作,和其它apiserver的增删改操作一样,不再多描述细节。
watch的API处理
既然watch本身是一个apiserver提供的http restful的API,那么就按照API的方式去阅读它的代码,按照apiserver的基础功能实现一文所描述,我们来看它的代码,
关键的处理API注册代码pkg/apiserver/api_installer.go
- 函数 func (a *APIInstaller) registerResourceHandlers()
路径 staging/src/k8s.io/apiserver/pkg/endpoints/installer.go 一个rest.Storage对象会被转换为watcher和lister对象
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
一个rest.Storage对象会被转换为watcher和lister对象,提供list和watch服务的入口是同一个,在API接口中是通过 GET /pods?watch=true 这种方式来区分是list还是watch,API处理函数是由lister和watcher经过ListResource()合体后完成的。
case "LIST": // List all resources of a kind.
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("list"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), allMediaTypes...)...).
Returns(http.StatusOK, "OK", versionedList).
Writes(versionedList)
switch {
case isLister && isWatcher:
case isWatcher:
addParams(route, action.Params)
routes = append(routes, route)
restfulListResource函数最终去向为ListResource,路径vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
restfulListResource函数最终去向为ListResource,路径vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
func restfulListResource(r rest.Lister, rw rest.Watcher, scope handlers.RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.ListResource(r, rw, scope, forceWatch, minRequestTimeout)(res.ResponseWriter, req.Request)
}
}
函数 func ListResource函数
每次有一个watch的url请求过来,都会调用rw.Watch()创建一个watcher
使用serveWatch()来处理这个请求
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("List", traceFields(req)...)
namespace, err := scope.Namer.Namespace(req)
if err != nil {
scope.err(err, w, req)
return
}
// Watches for single objects are routed to this function.
// Treat a name parameter the same as a field selector entry.
hasName := true
_, name, err := scope.Namer.Name(req)
if err != nil {
hasName = false
}
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
opts := metainternalversion.ListOptions{}
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
scope.err(err, w, req)
return
}
// transform fields
// TODO: DecodeParametersInto should do this.
if opts.FieldSelector != nil {
fn := func(label, value string) (newLabel, newValue string, err error) {
return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value)
}
if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
}
if hasName {
// metadata.name is the canonical internal name.
// SelectionPredicate will notice that this is a request for
// a single object and optimize the storage query accordingly.
nameSelector := fields.OneTermEqualSelector("metadata.name", name)
// Note that fieldSelector setting explicitly the "metadata.name"
// will result in reaching this branch (as the value of that field
// is propagated to requestInfo as the name parameter.
// That said, the allowed field selectors in this branch are:
// nil, fields.Everything and field selector matching metadata.name
// for our name.
if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name")
if !ok || name != selectedName {
scope.err(errors.NewBadRequest("fieldSelector metadata.name doesn't match requested name"), w, req)
return
}
} else {
opts.FieldSelector = nameSelector
}
}
if opts.Watch || forceWatch {
if rw == nil {
scope.err(errors.NewMethodNotSupported(scope.Resource.GroupResource(), "watch"), w, req)
return
}
// TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
if timeout == 0 && minRequestTimeout > 0 {
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
// Log only long List requests (ignore Watch).
defer trace.LogIfLong(500 * time.Millisecond)
trace.Step("About to List from storage")
result, err := r.List(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Listing from storage done")
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
}
}
func (s *WatchServer) ServeHTTP()函数
for {
select {
case event, ok := <-ch:
obj := event.Object
s.Fixup(obj)
if err := s.EmbeddedEncoder.Encode(obj, buf);
unknown.Raw = buf.Bytes()
event.Object = &unknown
outEvent := &metav1.WatchEvent{}
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_versioned_InternalEvent_to_versioned_Event(internalEvent, outEvent, nil)
if err := e.Encode(outEvent);
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
watcher(watch.Interface)对象是被staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go 对象创建出来的。watch是所有通用的函数,根据label和field过滤
func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
predicate := e.PredicateFunc(label, field)
resourceVersion := ""
if options != nil {
resourceVersion = options.ResourceVersion
predicate.IncludeUninitialized = options.IncludeUninitialized
}
return e.WatchPredicate(ctx, predicate, resourceVersion)
}
Cacher
结构体 cahcer
- storage提供了增删改查watch list接口,主要面向etcd
- watchCache有限容量的滑动窗口
- watchers接口体包括所有请求watch的map
type Cacher struct {
// Underlying storage.Interface.
storage Interface
// Expected type of objects in the underlying cache.
objectType reflect.Type
// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector *cache.Reflector
watcherIdx int
watchers indexedWatchers
}
2.2 函数 func (c *Cacher) Watch
newCacheWatcher生成一个watcher
addwatcher并将watcher插入到cacher.watchers中
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
c.ready.wait()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
chanSize := 10
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
return watcher, nil
2.3 函数 func newCacheWatcher
newCacheWatcher生成一个watcher
process异步处理
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
}
go watcher.process(initEvents, resourceVersion)
return watcher
}
2.3 函数 func (c *cacheWatcher) process
process函數主要读取input调用sendWatchCacheEvent,继续sendWatchCacheEnven函数
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
objType := "<null>"
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
}
defer close(c.result)
defer c.Stop()
for {
event, ok := <-c.input
if !ok {
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
}
}
2.4 函数func (c *cacheWatcher) sendWatchCacheEvent
watchCacheEvent进行Filter,发送到cacher.result channel中
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
}
select {
case <-c.done:
return
default:
}
select {
case c.result <- watchEvent:
case <-c.done:
}
}
```
三. 实现接口部分
pkg/registry/core/pod/storage/storage.go
3.1 NewPodStorage函数
PodStorage.Pod.Store封装了对etcd的操作;
```
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
PredicateFunc: pod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
CreateStrategy: pod.Strategy,
UpdateStrategy: pod.Strategy,
DeleteStrategy: pod.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
statusStore := *store
statusStore.UpdateStrategy = pod.StatusStrategy
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}
}
```
3.2 CompleWithOptions函数
路径:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
主要函数是GetRESTOptions
```
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
if e.Storage == nil {
e.Storage, e.DestroyFunc = opts.Decorator(
opts.StorageConfig,
e.NewFunc(),
prefix,
keyFunc,
e.NewListFunc,
attrFunc,
triggerFunc,
)
}
return nil
}
3.3 GetRESTOptinons函数
路径:/vendor/k8s.io/apiserver/pkg/server/options/etcd.go
调用genericregistry.StorageWithCacher创建cache store
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
3.4 StorageWithCacher函数
路径: vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go
根据配置创建cacher
NewRawStorage根据配置类型,具体是创建etcd v2或者v3 client
// Creates a cacher based given storageConfig.
func StorageWithCacher(capacity int) generic.StorageDecorator {
return func(
storageConfig *storagebackend.Config,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
s, d := generic.NewRawStorage(storageConfig)
if capacity == 0 {
glog.V(5).Infof("Storage caching is disabled for %T", objectType)
return s, d
}
cacherConfig := storage.CacherConfig{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
cacher := storage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
cacher.Stop()
d()
}
RegisterStorageCleanup(destroyFunc)
return cacher, destroyFunc
}
}
3.5 NewCacherFromConfig函数
路径: vendor/k8s.io/apiserver/pkg/storage/cacher.go
创建cacher服务于list-watch创建watchCache对象和cacheListerWatcher对象,cacheListWatcher对象是ListerWatcher接口实现,实现了List()和Watch()方法
构建Cacher对象
(1) watchCache是一个结构体,用来存储apiserver从etcd watch到的对象
(2) watchers是一个indexedWatchers结构体,当kubelet,scheduler需要watch某类资源时,他们会向kube-apiserver发起watch请求,apiserver就会生成一个cacheWatcher,cacheWatcher负责将watch的资源从apiserver发送到kubelet, scheduler
(3) Reflector结构体数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();listerWatcher包装了Storage,主要是将watch到的对象存到watchCache中;
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
(4) incoming channel接收watchCacheEvent的100个空间的channel
协程cacher.dispatchEvents,watchCache将incoming channel接收watchCacheEvent添加到watchers的inputChan中
func (c *Cacher) dispatchEvents() {
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
c.dispatchEvent(&event)
case <-c.stopCh:
return
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
for _, watcher := range c.watchers.allWatchers {
watcher.add(event, c.dispatchTimeoutBudget)
}
if supported {
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event, c.dispatchTimeoutBudget)
}
}
} else {
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event, c.dispatchTimeoutBudget)
}
}
}
}
协程cacher.startCaching
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
stopCh := make(chan struct{})
cacher := &Cacher{
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
)
}()
return cacher
}
3.6 startCaching函数
路径: vendor/k8s.io/apiserver/pkg/storage/cacher.go
主要函数reflecrtor.ListAndWatch函数,把远端etcd数据同步到本地的方法,存在watchCache
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
})
defer func() {
if successfulList {
c.ready.set(false)
}
}()
c.terminateAllWatchers()
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
glog.Errorf("unexpected ListAndWatch error: %v", err)
}
}
3.7 ListAndWatch函数
路径: vendor/k8s.io/client-go/tools/cache/reflector.go
首先list所有条目,根据版本进行watch
调用listerWatcher.List方法获得
调用listerWatcher.Watch进行操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
list, err := r.listerWatcher.List(options)
r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
for {
w, err := r.listerWatcher.Watch(options)
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
return nil
}
}
}
3.8 newCacherListerWatcher函数
路径: vendor/k8s.io/apiserver/pkg/storage/cacher.go
storage接口包含list watch接口
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}
3.9 NewRawStorage函数
路径: vendor/k8s.io/apiserver/pkg/registry/gereric/registry/storage_factory.go
往回找到,最终去向是etcd,应该是k8s1.6版本从v2更新至v3
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
return s, d
}
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
3.10 newETCD3Storage函数
路径: vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go
对就是etcdv3 client实现了list watch接口
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := clientv3.New(cfg)
if c.Quorum {
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursi