List-Watch是kubernetes的核心机制。组件kubelet、kube-controller-manager、kube-scheduler需要监控各种资源(pod、service等)的变化,当这些对象发生变化时(add、delete、update),kube-apiserver会主动通知这些组件。这个过程类似一个发布-订阅系统。本文章将从代码角度探究一下list-watch的实现方式。
源码分析是kubernetes tag v1.9.0
第一部分:kube-apiserver对etcd的List-watch机制
构建PodStorage
pkg/registry/core/pod/storage.go
NewStorage方法
1. kube-apiserver针对每一类资源(pod、service、endpoint、replication controller、depolyments)都会构建Storage对象,如:PodStorage;
2. PodStorage.Pod.Store封装了对etcd的操作;
3. store.CompleteWithOptions会调用etcdOptions.GetRESTOptions,此方法将
调用generic.UndecoratedStorage创建无cache的Store;
或者调用genericregistry.StorageWithCacher创建带Cache的Store;
StorageWithCacher
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go
StorageWithCacher方法
1. 调用NewRawStorage,它将构建etcdHelper,etcdHelper实现Storage.Interface接口,封装了对etcd的操作;
2. 调用NewCacherFromConfig,将创建Cacher对象;
创建Cacher
staging/src/k8s.io/apiserver/pkg/storage/cacher.go
NewCacherFromConfig方法
1. 首先,创建watchCache对象和cacheListerWatcher对象,cacheListWatcher对象是ListerWatcher接口实现,实现了List()和Watch()方法;
2. 构建Cacher对象,主要的数据成员:watchCache、reflector、watchers及incoming channel;
(1) watchCache是一个cache,用来存储apiserver从etcd那里watch到的对象;
(2) watchers是一个map,map的值类型为cacheWatcher,当kubelet、kube-scheduler需要watch某类资源时,他们会向kube-apiserver发起watch请求,kube-apiserver就会生成一个cacheWatcher,cacheWatcher负责将watch的资源通过http从apiserver传递到kubelet、kube-scheduler;
(3) Reflector对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();listerWatcher包装了Storage,主要是将watch到的对象存到watchCache中;
(4) incoming channel接收watchCacheEvent;
3. 协程调用cacher.dispatchEvents,watchCache将incoming channel接收watchCacheEvent添加到watchers的inputChan中;
4. 协程调用cacher.startCaching;
StartCaching
staging/src/k8s.io/client-go/tools/cache/reflector.go
ListAndWatch方法
1 执行cacheListerWatcher的List方法和Watch方法;
2 调用reflector的watchHandler方法;
cacheListWatcher.List/cacheListWatcher.Watch
staging/src/k8s.io/apiserver/pkg/storage/cacher.go
List方法和Watch方法
1 List方法将调用storage.List方法,这里是etcdHelper.List方法;
2 Watch方法将调用storage.watch方法,这里是etcdHelper.WatchList方法;
etcdHelper.List/etcdHelper.Watch
staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_helper.go
List方法/WatchList方法
1. etcdHelper对象是Storage接口对象的实现;
2. etcdHelper的List方法:
(1) 获取etcd的对象(包括resourceVersion信息);
3. etcdHelper的WatchList方法:
(1) 创建etcdWatcher;
(2) etcdWatcher对象,实现了Watch接口;
(3) etcdWatcher对象,主要的数据成员是etcdIncoming channel和outgoing channel;
(4) 协程执行etcdWatcher.translate;
(5) 最后,协程运行etcdWatcher.etcdWatch;
etcdWatcher.etcdWatch
staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go
etcdWatch方法
1. 如果resourceVersion==0, 运行etcdGetInitialWatchState(),获取所有的pods,并将结果输入到etcdIncoming channel;
2. 之后,不停的调用watcher.Next(),并将结果输入到etcdIncoming channel;
etcdWatcher.translate
staging/src/k8s.io/apiserver/pkg/storage/etcd/etcd_watcher.go
translate方法
1. 读取etcdIncoming channel信息;
2. 调用etcdWatcher.sendResult进行转化;
3. 发送到outgoing channel;
reflector.watchHandler
staging/src/k8s.io/client-go/tools/cache/reflector.go
watchHandler方法
1. 读取outgoing channel信息,操作watchCache;
操作watchCache
staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go
Add方法
Delete方法
Get方法
Update方法
处理事件watchCache.processEvent
staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go
processEvent方法
1. 创建watchCacheEvent
2. 调用watchCache.updateCache,更新watchCache;
到此分析完kube-apiserver对etcd的watch机制,除此之外,kube-apiserver会向其他组件提供watch接口,下面将分析kube-apiserver的watch API。
第二部分:kube-apiserver的watch restful API
kube-apiserver提供watch restful API给其他组件(kubelet、kube-controller-manager、kube-scheduler、kube-proxy)。watch restful API的处理流程和PUT、DELETE、GET等REST API处理流程类似。
registerResourceHandlers
staging/src/k8s.io/apiserver/pkg/endpoints/installer.go
registerResourceHandlers方法
ListResource
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go
ListResource方法
1.调用rw.watch方法,这里将会调用Store.watch;
2.调用serveWatch方法;
Store.watch
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
watch方法和watchPredicate方法
1. 调用Storage.Watch方法和Storage.WatchList方法,这里将调用Cacher.watch方法和Cacher.WatchList方法
Cacher.watch
staging/src/k8s.io/apiserver/pkg/storage/cacher.go
watch方法和watchList方法
1. watch方法中将调用newCacheWatcher;
2. newCacheWatcher方法:
(1) 生成一个watcher,并将watcher插入到cacher.watchers中;
(2) 协程调用cacheWatcher.process方法,此方法将会操作input channel的消息;
操作input channel
staging/src/k8s.io/apiserver/pkg/storage/cacher.go
cacheWatcher.process方法
1. 读取input channel的信息,并调用sendWatchCacheEvent方法;
sendWatchCacheEvent
staging/src/k8s.io/apiserver/pkg/storage/cacher.go
cacheWatcher.sendWatchCacheEvent方法
1. kube-apiserver的watch会带过滤功能;
2. 对watchCacheEvent进行Filter,发送到cacher.result channel中;
serveWatch
apiserver/pkg/endpoints/handlers/rest.go
serveWatch方法
1. 对result Channel信息进行序列化,并发送给调用者;