从源码看k8s 1.34对list请求如何处理

背景

1.34版本的k8s中apiserver处理list请求的逻辑由于很多新特性已经变化比较大,这边做个简单的总结

源码

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中

处理list请求
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
    ...
    result, err := delegator.ShouldDelegateList(opts, c.cacher)
    if err != nil {
        return err
    }
    ...
    委托给etcd层处理
    if result.ShouldDelegate {
        return c.storage.GetList(ctx, key, opts, listObj)
    }
    ...
    if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
        开启了ResilientWatchCacheInitialization特性
        if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
            cache未就绪且没有labelSelector且没有fieldselector且携带了大于0的limit,则委托给etcd层处理
            return c.storage.GetList(ctx, key, opts, listObj)
        }
    } else {
        未开启ResilientWatchCacheInitialization特性
        if listRV == 0 && !c.cacher.Ready() {
            listRV == 0但是cache还未就绪,则委托给etcd层处理
            return c.storage.GetList(ctx, key, opts, listObj)
        }
    }
    从cache查询
    err = c.cacher.GetList(ctx, key, opts, listObj)
    if err != nil {
        从cache查询报错
        如果是resourceversion过期了但是开启了ListFromCacheSnapshot特性,则委托给etcd层处理
        if errors.IsResourceExpired(err) && utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
            return c.storage.GetList(ctx, key, opts, listObj)
        }
        if result.ConsistentRead {
            如果是一致性读
            如果resourceversion过大
            if storage.IsTooLargeResourceVersion(err) {
                fallback = "true"
                委托给etcd层查询
                err = c.storage.GetList(ctx, key, opts, listObj)
            }
            ...
        }
        返回err
        return err
    }
    ...
    return nil
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go

是否需要委托给etcd层处理list请求
func ShouldDelegateList(opts storage.ListOptions, cache Helper) (Result, error) {
    检查list opt中ResourceVersionMatch设置了什么值
    switch opts.ResourceVersionMatch {
    case metav1.ResourceVersionMatchExact:
        判断是否需要委托给etcd层处理exact rv的list请求
        return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
    case metav1.ResourceVersionMatchNotOlderThan:
        ResourceVersionMatchNotOlderThan则从cache处理
        return Result{ShouldDelegate: false}, nil
    case "":
        // Continue
        if len(opts.Predicate.Continue) > 0 {
            return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive)
        }
        limit>0且resourceversion非空且resourceversion不是0
        if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
            判断是否需要委托给etcd层处理exact rv的list请求
            return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
        }
        // Consistent Read
        if opts.ResourceVersion == "" {
            判断是否委托给etcd处理一致性读
            return cache.ShouldDelegateConsistentRead()
        }
        cache处理
        return Result{ShouldDelegate: false}, nil
    default:
        其他则委托给etcd层处理
        return Result{ShouldDelegate: true}, nil
    }
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中


func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) {
    不是递归查询或者watchcache snapshots为nil则委托给etcd层处理
    if !recursive || c.watchCache.snapshots == nil {
        return delegator.Result{ShouldDelegate: true}, nil
    }
    解析continuerv
    _, continueRV, err := storage.DecodeContinue(continueToken, c.resourcePrefix)
    if err != nil {
        return delegator.Result{}, err
    }
    大于0
    if continueRV > 0 {
        判断是否需要委托给etcd层处理exact rv的list请求
        return c.shouldDelegateExactRV(uint64(continueRV))
    } else {
        判断是否支持一致性读
        return c.ShouldDelegateConsistentRead()
    }
}


是否委托给etcd层处理一致性读
func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) {
    return delegator.Result{
        ConsistentRead: true,
        如果不支持一致性读,则委托给etcd层处理
        ShouldDelegate: !delegator.ConsistentReadSupported(),
    }, nil
}


判断是否需要委托给etcd层处理exact rv的list请求
func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) {
    不是递归查询或者watchcache snapshots为nil则委托给etcd层处理
    if !recursive || c.watchCache.snapshots == nil {
        return delegator.Result{ShouldDelegate: true}, nil
    }
    极细listrv
    listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
    if err != nil {
        return delegator.Result{}, err
    }
    return c.shouldDelegateExactRV(listRV)
}

判断是否需要委托给etcd层处理exact rv的list请求
func (c *Cacher) shouldDelegateExactRV(rv uint64) (delegator.Result, error) {
    watchcache不够新(也就是watchcache中的resourceversion小于传入的)
    if c.watchCache.notFresh(rv) {
        return delegator.Result{
            如果不支持一致性读,则委托给etcd层处理
            ShouldDelegate: !delegator.ConsistentReadSupported(),
        }, nil
    }
    从watchcache snapshot中拿resourceversion
    _, canServe := c.watchCache.snapshots.GetLessOrEqual(rv)
    return delegator.Result{
        拿不到则委托给etcd层处理
        ShouldDelegate: !canServe,
    }, nil
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator/interface.go

判断是否支持一致性读
func ConsistentReadSupported() bool {
    consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
    requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
    如果开启了ConsistentListFromCache(1.34强制开启)且etcd支持RequestWatchProgress特性(小于3.4.31或者3.5.0到3.5.13之间不支持)
    return consistentListFromCacheEnabled && requestWatchProgressSupported
}

指标

名称

apiserver_watchcache_consistent_read_total

label

group
resource
success
fallback
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容