从源码看ShardedListAndWatch特性

背景

当前控制器新增实例后要不就是只有单一可以执行要不就是多个分片执行(但是客户端还是会收到全量事件)
k8s 1.36新增了ShardedListAndWatch这个特性来优化这个问题,1.36当前是alpha阶段默认关闭

简单介绍

ShardedListAndWatch这个特性让event分片在apiserver侧执行,客户端不需要收到全量的event

源码

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go中

list过滤
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
    ...
    如果开启了ShardedListAndWatch特性,且指定了ShardSelector,解析并设置设置ShardSelector
    if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) && options.ShardSelector != "" {
        sel, err := sharding.Parse(options.ShardSelector)
        if err != nil {
            return nil, fmt.Errorf("invalid shard selector: %w", err)
        }
        p.ShardSelector = sel
    }
    ...
    err := e.Storage.GetList(ctx, e.KeyRootFunc(ctx), storageOpts, list)

}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中

获取list

func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
    ...
    等待缓存足够新然后获取list
    resp, indexUsed, err := c.watchCache.WaitUntilFreshAndGetList(ctx, preparedKey, opts)
    if err != nil {
        return err
    }
    ...
    for i, obj := range resp.Items {
        elem, ok := obj.(*store.Element)
        if !ok {
            return fmt.Errorf("non *store.Element returned from storage: %v", obj)
        }
        shardMatch := true
        if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
            var err error
            shardMatch, err = opts.Predicate.MatchesSharding(elem.Object)
            if err != nil {
                return fmt.Errorf("shard matching failed: %w", err)
            }
        }
        ...
        如果满足分片匹配且满足labels和fields过滤,则添加到selectedObjects
        if shardMatch && opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
            selectedObjects = append(selectedObjects, elem.Object)
            lastSelectedObjectKey = elem.Key
        }
        ...
    }
    ...
}

staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go中

匹配分片
func (s *SelectionPredicate) MatchesSharding(obj runtime.Object) (bool, error) {
    判断是否开启ShardedListAndWatch特性
    if !utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
        return true, nil
    }
    判断是否指定了ShardSelector且非空
    if s.ShardSelector != nil && !s.ShardSelector.Empty() {
        return s.ShardSelector.Matches(obj)
    }
    返回匹配成功
    return true, nil
}

staging/src/k8s.io/apiserver/pkg/sharding/parser.go

解析ShardSelector表达式
func Parse(expr string) (apisharding.Selector, error) {
    ...
    创建解析器
    p, err := celparser.NewParser(celparser.Macros())
    if err != nil {
        return nil, fmt.Errorf("failed to create CEL parser: %w", err)
    }
    解析ShardSelector表达式
    parsed, errs := p.Parse(common.NewTextSource(expr))
    if errs != nil && len(errs.GetErrors()) > 0 {
        return nil, fmt.Errorf("CEL parse error: %s", errs.GetErrors()[0].Message)
    }
    遍历ShardSelector表达式
    reqs, err := walkExpr(parsed.Expr())
    if err != nil {
        return nil, err
    }
    ...
    构建ShardSelector对象
    return apisharding.NewSelector(reqs...), nil
}

func walkExpr(e ast.Expr) ([]apisharding.ShardRangeRequirement, error) {
        ...
        ||函数
        if fn == operators.LogicalOr {
            ...
            return append(left, right...), nil

        }
        ...
        shardRange函数
        if fn == "shardRange" {
            ...
            解析shardRange函数调用
            req, err := parseShardRangeCall(call)
            if err != nil {
                return nil, err
            }
            return []apisharding.ShardRangeRequirement{req}, nil
            ...
        }
        ...

}

解析shardRange函数调用
func parseShardRangeCall(call ast.CallExpr) (apisharding.ShardRangeRequirement, error) {
    ...
    获取参数
    args := call.Args()
    if len(args) != 3 {
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() requires exactly 3 arguments, got %d", len(args))
    }
    获取第一个参数作为fieldPath
    fieldPath, err := extractFieldPath(args[0])
    if err != nil {
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() first argument: %w", err)
    }

    fieldpath只支持object.metadata.uid和object.metadata.namespace
    switch fieldPath {
    case "object.metadata.uid", "object.metadata.namespace":
        // ok
    default:
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("unsupported field path %q; supported: object.metadata.uid, object.metadata.namespace", fieldPath)
    }

    获取第二个参数作为hexStart
    hexStart, err := extractHexLiteral(args[1], "hexStart")
    if err != nil {
        return apisharding.ShardRangeRequirement{}, err
    }

    获取第三个参数作为hexEnd
    hexEnd, err := extractHexLiteral(args[2], "hexEnd")
    if err != nil {
        return apisharding.ShardRangeRequirement{}, err
    }
    hexStart必须小雨hexEnd
    if !apisharding.HexLess(hexStart, hexEnd) {
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("shard range start %s must be less than end %s", hexStart, hexEnd)
    }

    return apisharding.ShardRangeRequirement{
        Key:   fieldPath,
        Start: hexStart,
        End:   hexEnd,
    }, nil
    ...
}

staging/src/k8s.io/apimachinery/pkg/sharding/selector.go中

构建ShardSelector对象
func NewSelector(reqs ...ShardRangeRequirement) Selector {
    if len(reqs) == 0 {
        return Everything()
    }
    return &shardSelector{
        requirements: reqs,
    }
}


匹配
func (s *shardSelector) Matches(obj runtime.Object) (bool, error) {
    ...
    获取field
    value, err := ResolveFieldValue(obj, key)
    if err != nil {
        return false, err
    }
    计算hash
    hash := "0x" + HashField(value)

    如果hash在start和end之间,则成功匹配
    for _, req := range s.requirements {
        if !HexLess(hash, req.Start) && HexLess(hash, req.End) {
            return true, nil
        }
    }
    否则不匹配
    return false, nil
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容