背景
当前控制器新增实例后要不就是只有单一可以执行要不就是多个分片执行(但是客户端还是会收到全量事件)
k8s 1.36新增了ShardedListAndWatch这个特性来优化这个问题,1.36当前是alpha阶段默认关闭
简单介绍
ShardedListAndWatch这个特性让event分片在apiserver侧执行,客户端不需要收到全量的event
源码
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go中
list过滤
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
...
如果开启了ShardedListAndWatch特性,且指定了ShardSelector,解析并设置设置ShardSelector
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) && options.ShardSelector != "" {
sel, err := sharding.Parse(options.ShardSelector)
if err != nil {
return nil, fmt.Errorf("invalid shard selector: %w", err)
}
p.ShardSelector = sel
}
...
err := e.Storage.GetList(ctx, e.KeyRootFunc(ctx), storageOpts, list)
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中
获取list
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
...
等待缓存足够新然后获取list
resp, indexUsed, err := c.watchCache.WaitUntilFreshAndGetList(ctx, preparedKey, opts)
if err != nil {
return err
}
...
for i, obj := range resp.Items {
elem, ok := obj.(*store.Element)
if !ok {
return fmt.Errorf("non *store.Element returned from storage: %v", obj)
}
shardMatch := true
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
var err error
shardMatch, err = opts.Predicate.MatchesSharding(elem.Object)
if err != nil {
return fmt.Errorf("shard matching failed: %w", err)
}
}
...
如果满足分片匹配且满足labels和fields过滤,则添加到selectedObjects
if shardMatch && opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
...
}
...
}
staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go中
匹配分片
func (s *SelectionPredicate) MatchesSharding(obj runtime.Object) (bool, error) {
判断是否开启ShardedListAndWatch特性
if !utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
return true, nil
}
判断是否指定了ShardSelector且非空
if s.ShardSelector != nil && !s.ShardSelector.Empty() {
return s.ShardSelector.Matches(obj)
}
返回匹配成功
return true, nil
}
staging/src/k8s.io/apiserver/pkg/sharding/parser.go
解析ShardSelector表达式
func Parse(expr string) (apisharding.Selector, error) {
...
创建解析器
p, err := celparser.NewParser(celparser.Macros())
if err != nil {
return nil, fmt.Errorf("failed to create CEL parser: %w", err)
}
解析ShardSelector表达式
parsed, errs := p.Parse(common.NewTextSource(expr))
if errs != nil && len(errs.GetErrors()) > 0 {
return nil, fmt.Errorf("CEL parse error: %s", errs.GetErrors()[0].Message)
}
遍历ShardSelector表达式
reqs, err := walkExpr(parsed.Expr())
if err != nil {
return nil, err
}
...
构建ShardSelector对象
return apisharding.NewSelector(reqs...), nil
}
func walkExpr(e ast.Expr) ([]apisharding.ShardRangeRequirement, error) {
...
||函数
if fn == operators.LogicalOr {
...
return append(left, right...), nil
}
...
shardRange函数
if fn == "shardRange" {
...
解析shardRange函数调用
req, err := parseShardRangeCall(call)
if err != nil {
return nil, err
}
return []apisharding.ShardRangeRequirement{req}, nil
...
}
...
}
解析shardRange函数调用
func parseShardRangeCall(call ast.CallExpr) (apisharding.ShardRangeRequirement, error) {
...
获取参数
args := call.Args()
if len(args) != 3 {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() requires exactly 3 arguments, got %d", len(args))
}
获取第一个参数作为fieldPath
fieldPath, err := extractFieldPath(args[0])
if err != nil {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() first argument: %w", err)
}
fieldpath只支持object.metadata.uid和object.metadata.namespace
switch fieldPath {
case "object.metadata.uid", "object.metadata.namespace":
// ok
default:
return apisharding.ShardRangeRequirement{}, fmt.Errorf("unsupported field path %q; supported: object.metadata.uid, object.metadata.namespace", fieldPath)
}
获取第二个参数作为hexStart
hexStart, err := extractHexLiteral(args[1], "hexStart")
if err != nil {
return apisharding.ShardRangeRequirement{}, err
}
获取第三个参数作为hexEnd
hexEnd, err := extractHexLiteral(args[2], "hexEnd")
if err != nil {
return apisharding.ShardRangeRequirement{}, err
}
hexStart必须小雨hexEnd
if !apisharding.HexLess(hexStart, hexEnd) {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shard range start %s must be less than end %s", hexStart, hexEnd)
}
return apisharding.ShardRangeRequirement{
Key: fieldPath,
Start: hexStart,
End: hexEnd,
}, nil
...
}
staging/src/k8s.io/apimachinery/pkg/sharding/selector.go中
构建ShardSelector对象
func NewSelector(reqs ...ShardRangeRequirement) Selector {
if len(reqs) == 0 {
return Everything()
}
return &shardSelector{
requirements: reqs,
}
}
匹配
func (s *shardSelector) Matches(obj runtime.Object) (bool, error) {
...
获取field
value, err := ResolveFieldValue(obj, key)
if err != nil {
return false, err
}
计算hash
hash := "0x" + HashField(value)
如果hash在start和end之间,则成功匹配
for _, req := range s.requirements {
if !HexLess(hash, req.Start) && HexLess(hash, req.End) {
return true, nil
}
}
否则不匹配
return false, nil
}