背景
informer的list-watch中的list以及普通的list请求如果比较多的场景会对kube-apiserver的内存有较大的瞬时压力,甚至会oom,从而降低稳定性
引入watchlist可以list的请求走watch的方式,称为watchlist,内存开销更稳定
场景
list
简单总结
通过给list的options中设置一些参数,让list请求走watch的方式,称为watchlist,直到收到bookmark事件则watchlist结束
源码
util/watchlist/watch_list.go中
func PrepareWatchListOptionsFromListOptions(listOptions metav1.ListOptions) (metav1.ListOptions, bool, error) {
判断是否开启watchlistclient特性
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return metav1.ListOptions{}, false, nil
}
...
options中不能设置limit>0同时resourceVersion不为0
if listOptions.Limit > 0 && listOptions.ResourceVersion != "0" {
return metav1.ListOptions{}, false, nil
}
...
resourceVersionMatch不能为exact
if listOptions.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
return metav1.ListOptions{}, false, nil
}
...
设置resourceVersionMatch为notOlderThan
watchListOptions.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
设置options watch为true
watchListOptions.Watch = true
设置AllowWatchBookmarks为true
watchListOptions.AllowWatchBookmarks = true
设置SendInitialEvents为true
watchListOptions.SendInitialEvents = ptr.To(true)
...
返回最终应该使用的options
return watchListOptions, true, nil
}
gentype/type.go中
func (l *alsoLister[T, L]) List(ctx context.Context, opts metav1.ListOptions) (L, error) {
if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil {
...
} else if hasWatchListOptionsPrepared {
满足watchlist条件则使用watchlist
result, err := l.watchList(ctx, watchListOptions)
if err == nil {
consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, "watchlist request for "+l.client.resource, l.list, opts, result)
return result, nil
}
klog.Warningf("The watchlist request for %s ended with an error, falling back to the standard LIST semantics, err = %v", l.client.resource, err)
}
result, err := l.list(ctx, opts)
if err == nil {
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, "list request for "+l.client.resource, l.list, opts, result)
}
return result, err
}
func (l *alsoLister[T, L]) watchList(ctx context.Context, opts metav1.ListOptions) (result L, err error) {
...
result = l.newList()
使用watchlist
err = l.client.client.Get().
UseProtobufAsDefaultIfPreferred(l.client.prefersProtobuf).
NamespaceIfScoped(l.client.namespace, l.client.namespace != "").
Resource(l.client.resource).
VersionedParams(&opts, l.client.parameterCodec).
Timeout(timeout).
WatchList(ctx).
Into(result)
...
}
rest/request.go中
func (r *Request) WatchList(ctx context.Context) WatchListResult {
...
如果未开启watchlistclient特性则报错
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return WatchListResult{err: fmt.Errorf("%q feature gate is not enabled", clientfeatures.WatchListClient)}
}
发送watchlist请求
w, d, err := r.watchInternal(ctx)
...
处理watchlist响应
return r.handleWatchList(ctx, w, d)
}
func (r *Request) watchInternal(ctx context.Context) (watch.Interface, runtime.Decoder, error) {
}
func (r *Request) handleWatchList(ctx context.Context, w watch.Interface, negotiatedObjectDecoder runtime.Decoder) WatchListResult {
}
informer
简单总结
informer的list-watch中的list操作改为使用watch(而不是一次性list所有数据)来完成,称为watchlist,直到收到bookmark事件则watchlist结束
源码
tools/cache/reflector.go中
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
...
if r.UseWatchList == nil {
如果开启了watchlistclient特性则设置为true
r.UseWatchList = ptr.To(clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient))
}
...
}
func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
...
是否使用watchlist
useWatchList := ptr.Deref(r.UseWatchList, false)
是否降级到list
fallbackToList := !useWatchList
if useWatchList {
w, err = r.watchList(ctx)
...
降级到list
if fallbackToList {
err = r.list(ctx)
...
}
...
return r.watchWithResync(ctx, w)
}
func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
...
for {
...
临时store
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
...
开始listwatch
watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
...
如果收到bookmark则认为watchlist结束,即收到最新的数据了
if watchListBookmarkReceived {
break
}
...
替换store为临时store的内容
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
}
...
}
func handleListWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errCh chan error,
) (bool, error) {
设置如果收到bookmark则结束watch
exitOnWatchListBookmarkReceived := true
开始watch
return handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
}
func handleAnyWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnWatchListBookmarkReceived bool,
clock clock.Clock,
errCh chan error,
) (bool, error) {
...
loop:
for {
select {
...
收到事件
case event, ok := <-w.ResultChan():
...
switch event.Type {
如果是增删改事件则对store进行增删改操作
case watch.Added:
err := store.Add(event.Object)
...
如果是bookmark事件
case watch.Bookmark:
判断事件的注解中是否包含key k8s.io/initial-events-end且value为true
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
设置已收到bookmark事件
watchListBookmarkReceived = true
}
...
如果已收到bookmark事件且设置了收到bookmark事件后结束watch
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
...
结束watch
return watchListBookmarkReceived, nil
}
...
}
list结束开始watch
func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) error {
...
开始watch
return r.watch(ctx, w, resyncerrc)
...
}
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
...
for {
...
开始watch
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc)
...
}
...
}
func handleWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errCh chan error,
) error {
设置如果收到bookmark则结束watch为false
exitOnWatchListBookmarkReceived := false
开始wartch
_, err := handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
return err
}