背景
1.34版本的k8s中apiserver处理list请求的逻辑由于很多新特性已经变化比较大,这边做个简单的总结
源码
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中
处理list请求
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
...
result, err := delegator.ShouldDelegateList(opts, c.cacher)
if err != nil {
return err
}
...
委托给etcd层处理
if result.ShouldDelegate {
return c.storage.GetList(ctx, key, opts, listObj)
}
...
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
开启了ResilientWatchCacheInitialization特性
if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
cache未就绪且没有labelSelector且没有fieldselector且携带了大于0的limit,则委托给etcd层处理
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
未开启ResilientWatchCacheInitialization特性
if listRV == 0 && !c.cacher.Ready() {
listRV == 0但是cache还未就绪,则委托给etcd层处理
return c.storage.GetList(ctx, key, opts, listObj)
}
}
从cache查询
err = c.cacher.GetList(ctx, key, opts, listObj)
if err != nil {
从cache查询报错
如果是resourceversion过期了但是开启了ListFromCacheSnapshot特性,则委托给etcd层处理
if errors.IsResourceExpired(err) && utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
return c.storage.GetList(ctx, key, opts, listObj)
}
if result.ConsistentRead {
如果是一致性读
如果resourceversion过大
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
委托给etcd层查询
err = c.storage.GetList(ctx, key, opts, listObj)
}
...
}
返回err
return err
}
...
return nil
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go
是否需要委托给etcd层处理list请求
func ShouldDelegateList(opts storage.ListOptions, cache Helper) (Result, error) {
检查list opt中ResourceVersionMatch设置了什么值
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact:
判断是否需要委托给etcd层处理exact rv的list请求
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
case metav1.ResourceVersionMatchNotOlderThan:
ResourceVersionMatchNotOlderThan则从cache处理
return Result{ShouldDelegate: false}, nil
case "":
// Continue
if len(opts.Predicate.Continue) > 0 {
return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive)
}
limit>0且resourceversion非空且resourceversion不是0
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
判断是否需要委托给etcd层处理exact rv的list请求
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
}
// Consistent Read
if opts.ResourceVersion == "" {
判断是否委托给etcd处理一致性读
return cache.ShouldDelegateConsistentRead()
}
cache处理
return Result{ShouldDelegate: false}, nil
default:
其他则委托给etcd层处理
return Result{ShouldDelegate: true}, nil
}
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中
func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) {
不是递归查询或者watchcache snapshots为nil则委托给etcd层处理
if !recursive || c.watchCache.snapshots == nil {
return delegator.Result{ShouldDelegate: true}, nil
}
解析continuerv
_, continueRV, err := storage.DecodeContinue(continueToken, c.resourcePrefix)
if err != nil {
return delegator.Result{}, err
}
大于0
if continueRV > 0 {
判断是否需要委托给etcd层处理exact rv的list请求
return c.shouldDelegateExactRV(uint64(continueRV))
} else {
判断是否支持一致性读
return c.ShouldDelegateConsistentRead()
}
}
是否委托给etcd层处理一致性读
func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) {
return delegator.Result{
ConsistentRead: true,
如果不支持一致性读,则委托给etcd层处理
ShouldDelegate: !delegator.ConsistentReadSupported(),
}, nil
}
判断是否需要委托给etcd层处理exact rv的list请求
func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) {
不是递归查询或者watchcache snapshots为nil则委托给etcd层处理
if !recursive || c.watchCache.snapshots == nil {
return delegator.Result{ShouldDelegate: true}, nil
}
极细listrv
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return delegator.Result{}, err
}
return c.shouldDelegateExactRV(listRV)
}
判断是否需要委托给etcd层处理exact rv的list请求
func (c *Cacher) shouldDelegateExactRV(rv uint64) (delegator.Result, error) {
watchcache不够新(也就是watchcache中的resourceversion小于传入的)
if c.watchCache.notFresh(rv) {
return delegator.Result{
如果不支持一致性读,则委托给etcd层处理
ShouldDelegate: !delegator.ConsistentReadSupported(),
}, nil
}
从watchcache snapshot中拿resourceversion
_, canServe := c.watchCache.snapshots.GetLessOrEqual(rv)
return delegator.Result{
拿不到则委托给etcd层处理
ShouldDelegate: !canServe,
}, nil
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go
判断是否支持一致性读
func ConsistentReadSupported() bool {
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
如果开启了ConsistentListFromCache(1.34强制开启)且etcd支持RequestWatchProgress特性(小于3.4.31或者3.5.0到3.5.13之间不支持)
return consistentListFromCacheEnabled && requestWatchProgressSupported
}
指标
名称
apiserver_watchcache_consistent_read_total
label
group
resource
success
fallback