k8s list-watch
Background
参考kubernetes设计理念分析 | 从运行流程和list-watch看kubernetes系统的设计理念
k8s各个组件与api-server通过list-watch机制通信。list-watch需要解决以下问题:
- 实时性:各个组件必须及时得知数据变化;
- 顺序性:保证数据变化的顺序性,如果删除在创建之前,画面太美;
- 可靠性:由于网络波动等因素,必须保证消息必达,AMQP?
解决之道
实时性
http streaming
,client发起HTTP长连接请求,server如果有数据更新就发送response。HTTP2通过连接复用技术,可以优化多个HTTP长连接共用一个TCP长连接。
顺序性
每一种资源都有resverison,当发生变化时,resverion加1。resversionde 一致性,由etcd保证全局单调递增,类似redis-incr。
所以client watch的response都是按照resversion排好序的。
resourceVersion参数说明
When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history. When specified for list: - if unset, then the result is returned from remote storage based on quorum-read flag; - if it's 0, then we simply return what we currently have in cache, no guarantee; - if set to non zero, then the result is at least as fresh as given rv. (optional)
可靠性
list-watch总是先list,获取apiserver cache中的所有数据,然后根据最后的resversion watch。这样如果网络波动,client先list获取之前未处理的数据,然后watch处理更新的数据。保证数据不丢失。
watch优化
问题
- 以前watch请求都是直接watch etcd,太多长连接给etcd以及apiserver都造成压力;
- 很多相同的watch请求,造成太多重复序列化/反序列化操作。
优化
- 每种REST,apiserver会watch etcd,然后cache到对应的storage;
- apiserver接收watch请求,只读对应的REST storage,避免直接连接etcd;
- list返回全量数据,每次watch失败都会relist。在大规模场景,如果所有client同时发生relist,那server肯定受不了。为了应对这种情况,提供了
EtcdResync
; - apiserver为了减少没用的长连接(client挂了),给每个watch都加了一个随机的超时参数。
Reflector
在k8s组件中,采用k8s.io\client-go\tools\cache\controller.goNewInformer()
对REST监控,其中核心是Reflector
。Reflector
监控指定的REST资源,然后将所有的变化保存在store
中,一般采用DeltaFIFO,DeltaFIFO is like FIFO, but allows you to process deletes
。
k8s.io\client-go\tools\cache\reflector.go
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
options := metav1.ListOptions{ResourceVersion: "0"}
list, err := r.listerWatcher.List(options)
resourceVersion = listMetaInterface.GetResourceVersion()
r.setLastSyncResourceVersion(resourceVersion)
for {
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timemoutseconds,
}
w, err := r.listerWatcher.Watch(options)
r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)
}
}
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
for {
select {
// streamwatch
case event, ok := <-w.ResultChan():
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
}
}
}