从源码看DetectCacheInconsistency特性

背景

1.34后ListFromCacheSnapshot默认开启,为了避免cache和etcd不一致,引入了cache和etcd的一致性检测,由DetectCacheInconsistency特性门控控制
判断的方式为从cache和etcd中拿到resourceversion和数据的 digest(math/fnv计算list resp的每一项的namespace/name/resourceversion),如果resourceversion一致的(不一致则忽略)则判断digest是否一致,不一致则根据KUBE_WATCHCACHE_CONSISTENCY_CHECKER环境变量作出行为(true则panic,false则清空cache snapshot)

源码

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中


var (
    一致性检查间隔
    ConsistencyCheckPeriod = 5 * time.Minute
    不一致时候是否panic
    panicOnCacheInconsistency = false
)

func init() {
    从环境变量中读取KUBE_WATCHCACHE_CONSISTENCY_CHECKER,用于设置不一致时候是否panic
    panicOnCacheInconsistency, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTENCY_CHECKER"))
}



构建CacheDelegator
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
    ...
    如果开启了DetectCacheInconsistency,则构建一致性检查器
    if utilfeature.DefaultFeatureGate.Enabled(features.DetectCacheInconsistency) || panicOnCacheInconsistency {
        构建一致性检查器
        d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.groupResource, cacher.newListFunc, cacher, storage)
        d.wg.Add(1)
        go func() {
            defer d.wg.Done()
            启动一致性检查器
            d.checker.startChecking(d.stopCh)
        }()
    }
    return d
}

构建一致性检查器
func newConsistencyChecker(resourcePrefix string, groupResource schema.GroupResource, newListFunc func() runtime.Object, cacher cacher, etcd getLister) *consistencyChecker {
    return &consistencyChecker{
        groupResource:  groupResource,
        resourcePrefix: resourcePrefix,
        newListFunc:    newListFunc,
        cacher:         cacher,
        etcd:           etcd,
    }
}

启动一致性检查器
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
    定期进行一致性检查
    err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
        c.check(ctx)
        return false, nil
    })
    if err != nil {
        klog.V(3).InfoS("Cache consistency check exiting", "group", c.groupResource.Group, "resource", c.groupResource.Resource, "err", err)
    }
}

实际的一致性检查
func (c *consistencyChecker) check(ctx context.Context) {
    获取cache和etcd数据的digests
    digests, err := c.calculateDigests(ctx)
    if err != nil {
        ...
        return
    }
    一致
    if digests.CacheDigest == digests.EtcdDigest {
        ...
        return
    }
    ...
    判断不一致是否panic
    if panicOnCacheInconsistency {
        不一致则panic
        panic(fmt.Sprintf("Cache consistency check failed, group: %q, resource: %q, resourceVersion: %q, etcdDigest: %q, cacheDigest: %q", c.groupResource.Group, c.groupResource.Resource, digests.ResourceVersion, digests.EtcdDigest, digests.CacheDigest))
    }
    ...
    标记缓存不一致
    c.cacher.MarkConsistent(false)
}


计算cache和etcd数据的digests
func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
    计算cache数据的digest和resourceVersion
    cacheDigest, cacheResourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, "0", 0)
    if err != nil {
        return nil, fmt.Errorf("failed calculating cache digest: %w", err)
    }
    计算etcd数据的digest和resourceVersion
    etcdDigest, etcdResourceVersion, err := c.calculateStoreDigest(ctx, c.etcd, cacheResourceVersion, storageWatchListPageSize)
    if err != nil {
        return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
    }
    如果cache和etcd的resourceVersion不一致则返回
    if cacheResourceVersion != etcdResourceVersion {
        return nil, fmt.Errorf("etcd returned different resource version then expected, cache: %q, etcd: %q", cacheResourceVersion, etcdResourceVersion)
    }
    返回digests
    return &storageDigest{
        ResourceVersion: cacheResourceVersion,
        CacheDigest:     cacheDigest,
        EtcdDigest:      etcdDigest,
    }, nil
}


计算store数据的digest和resourceVersion
func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, resourceVersion string, limit int64) (digest, rv string, err error) {
    opts := storage.ListOptions{
        Recursive:       true,
        Predicate:       storage.Everything,
        ResourceVersion: resourceVersion,
    }
    opts.Predicate.Limit = limit
    if resourceVersion == "0" {
        opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
    } else {
        opts.ResourceVersionMatch = metav1.ResourceVersionMatchExact
    }
    h := fnv.New64()
    for {
        resp := c.newListFunc()
        err = store.GetList(ctx, c.resourcePrefix, opts, resp)
        if err != nil {
            return "", "", err
        }
        写入数据用于计算digest
        err = addListToDigest(h, resp)
        if err != nil {
            return "", "", err
        }
        list, err := meta.ListAccessor(resp)
        if err != nil {
            return "", "", err
        }
        if resourceVersion == "0" {
            resourceVersion = list.GetResourceVersion()
        }
        continueToken := list.GetContinue()
        如果没数据了则返回
        if continueToken == "" {
            return fmt.Sprintf("%x", h.Sum64()), resourceVersion, nil
        }
        opts.Predicate.Continue = continueToken
        opts.ResourceVersion = ""
        opts.ResourceVersionMatch = ""
    }
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中

标记cache是否一致
func (c *Cacher) MarkConsistent(consistent bool) {
    c.watchCache.MarkConsistent(consistent)
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go中

标记watchCache是否一致
func (w *watchCache) MarkConsistent(consistent bool) {
    如果开启了ListFromCacheSnapshot
    if utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
        snapshottingEnabled标记为false
        w.snapshottingEnabled.Store(consistent)
        if !consistent && w.snapshots != nil {
            重置watch cache的snapshot
            w.snapshots.Reset()
        }
    }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容