背景
1.34后ListFromCacheSnapshot默认开启,为了避免cache和etcd不一致,引入了cache和etcd的一致性检测,由DetectCacheInconsistency特性门控控制
判断的方式为从cache和etcd中拿到resourceversion和数据的 digest(math/fnv计算list resp的每一项的namespace/name/resourceversion),如果resourceversion一致的(不一致则忽略)则判断digest是否一致,不一致则根据KUBE_WATCHCACHE_CONSISTENCY_CHECKER环境变量作出行为(true则panic,false则清空cache snapshot)
源码
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中
var (
一致性检查间隔
ConsistencyCheckPeriod = 5 * time.Minute
不一致时候是否panic
panicOnCacheInconsistency = false
)
func init() {
从环境变量中读取KUBE_WATCHCACHE_CONSISTENCY_CHECKER,用于设置不一致时候是否panic
panicOnCacheInconsistency, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTENCY_CHECKER"))
}
构建CacheDelegator
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
...
如果开启了DetectCacheInconsistency,则构建一致性检查器
if utilfeature.DefaultFeatureGate.Enabled(features.DetectCacheInconsistency) || panicOnCacheInconsistency {
构建一致性检查器
d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.groupResource, cacher.newListFunc, cacher, storage)
d.wg.Add(1)
go func() {
defer d.wg.Done()
启动一致性检查器
d.checker.startChecking(d.stopCh)
}()
}
return d
}
构建一致性检查器
func newConsistencyChecker(resourcePrefix string, groupResource schema.GroupResource, newListFunc func() runtime.Object, cacher cacher, etcd getLister) *consistencyChecker {
return &consistencyChecker{
groupResource: groupResource,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
cacher: cacher,
etcd: etcd,
}
}
启动一致性检查器
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
定期进行一致性检查
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
c.check(ctx)
return false, nil
})
if err != nil {
klog.V(3).InfoS("Cache consistency check exiting", "group", c.groupResource.Group, "resource", c.groupResource.Resource, "err", err)
}
}
实际的一致性检查
func (c *consistencyChecker) check(ctx context.Context) {
获取cache和etcd数据的digests
digests, err := c.calculateDigests(ctx)
if err != nil {
...
return
}
一致
if digests.CacheDigest == digests.EtcdDigest {
...
return
}
...
判断不一致是否panic
if panicOnCacheInconsistency {
不一致则panic
panic(fmt.Sprintf("Cache consistency check failed, group: %q, resource: %q, resourceVersion: %q, etcdDigest: %q, cacheDigest: %q", c.groupResource.Group, c.groupResource.Resource, digests.ResourceVersion, digests.EtcdDigest, digests.CacheDigest))
}
...
标记缓存不一致
c.cacher.MarkConsistent(false)
}
计算cache和etcd数据的digests
func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
计算cache数据的digest和resourceVersion
cacheDigest, cacheResourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, "0", 0)
if err != nil {
return nil, fmt.Errorf("failed calculating cache digest: %w", err)
}
计算etcd数据的digest和resourceVersion
etcdDigest, etcdResourceVersion, err := c.calculateStoreDigest(ctx, c.etcd, cacheResourceVersion, storageWatchListPageSize)
if err != nil {
return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
}
如果cache和etcd的resourceVersion不一致则返回
if cacheResourceVersion != etcdResourceVersion {
return nil, fmt.Errorf("etcd returned different resource version then expected, cache: %q, etcd: %q", cacheResourceVersion, etcdResourceVersion)
}
返回digests
return &storageDigest{
ResourceVersion: cacheResourceVersion,
CacheDigest: cacheDigest,
EtcdDigest: etcdDigest,
}, nil
}
计算store数据的digest和resourceVersion
func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, resourceVersion string, limit int64) (digest, rv string, err error) {
opts := storage.ListOptions{
Recursive: true,
Predicate: storage.Everything,
ResourceVersion: resourceVersion,
}
opts.Predicate.Limit = limit
if resourceVersion == "0" {
opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
} else {
opts.ResourceVersionMatch = metav1.ResourceVersionMatchExact
}
h := fnv.New64()
for {
resp := c.newListFunc()
err = store.GetList(ctx, c.resourcePrefix, opts, resp)
if err != nil {
return "", "", err
}
写入数据用于计算digest
err = addListToDigest(h, resp)
if err != nil {
return "", "", err
}
list, err := meta.ListAccessor(resp)
if err != nil {
return "", "", err
}
if resourceVersion == "0" {
resourceVersion = list.GetResourceVersion()
}
continueToken := list.GetContinue()
如果没数据了则返回
if continueToken == "" {
return fmt.Sprintf("%x", h.Sum64()), resourceVersion, nil
}
opts.Predicate.Continue = continueToken
opts.ResourceVersion = ""
opts.ResourceVersionMatch = ""
}
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中
标记cache是否一致
func (c *Cacher) MarkConsistent(consistent bool) {
c.watchCache.MarkConsistent(consistent)
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go中
标记watchCache是否一致
func (w *watchCache) MarkConsistent(consistent bool) {
如果开启了ListFromCacheSnapshot
if utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
snapshottingEnabled标记为false
w.snapshottingEnabled.Store(consistent)
if !consistent && w.snapshots != nil {
重置watch cache的snapshot
w.snapshots.Reset()
}
}
}