[k8s源码分析][kube-scheduler]scheduler之自定义调度器(1)

1. 前言

转载请说明原文出处, 尊重他人劳动成果!

本文将分析kube-scheduler的自定义调度器, 本文针对的是可以自己选择合适的预选和优选方法.
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)

2. 不带扩展方法

architecture.png

2.1 例子

集群安装可以参考k8s源码编译以及二进制安装(用于源码开发调试版).

2.2.1 准备配置文件

因为需要自定义预选和优选方法以及扩展方法, 所以肯定需要配置文件(schedulerConfig.yaml)

apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
  policy:
    file:
      path: policy.yaml
leaderElection:
  leaderElect: true
  lockObjectName: my-scheduler
  lockObjectNamespace: kube-system

下面是policy文件.

{
  "kind" : "Policy",
  "apiVersion" : "v1",
  "predicates" : [
  {"name" : "PodFitsHostPorts"},
  {"name" : "PodFitsResources"},
  {"name" : "NoDiskConflict"},
  {"name" : "MatchNodeSelector"},
  {"name" : "HostName"}
  ],
  "priorities" : [
  {"name" : "LeastRequestedPriority", "weight" : 1},
  {"name" : "BalancedResourceAllocation", "weight" : 1},
  {"name" : "ServiceSpreadingPriority", "weight" : 1},
  {"name" : "EqualPriority", "weight" : 1}
  ],
  "hardPodAffinitySymmetricWeight" : 10
}
2.1.2 运行以及测试

接着重新启动kube-scheduler用如下命令./kube-scheduler --master=http://localhost:8080 --config=schedulerConfig.yaml. (关于schedulerConfig.yamlpolicy.yaml自行修改就行)

接着部署一个带有schedulerNamepod和一个不带有schedulerNamepod.

[root@master kubectl]# cat pod-scheduler.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: test-schduler
spec:
  schedulerName: my-scheduler
  containers:
  - name: podtest-scheduler
    image: nginx
    ports:
    - containerPort: 80
[root@master kubectl]# cat pod.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: test
spec:
  containers:
  - name: podtest
    image: nginx
    ports:
    - containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml 
[root@master kubectl]# ./kubectl apply -f pod.yaml 
[root@master kubectl]# ./kubectl get pods 
NAME            READY   STATUS    RESTARTS   AGE
test            0/1     Pending   0          83s
test-schduler   1/1     Running   0          13m

可以看到带有schedulerNamepod, 也就是test-scheduler已经成功运行.

[root@master kubectl]# ./kubectl describe pod test-scheduler
...
Events:
  Type     Reason             Age                 From                  Message
  ----     ------             ----                ----                  -------
  Normal   Scheduled          12m                 my-scheduler          Successfully assigned default/test-schduler to 172.21.0.12
  Normal   Pulling            12m                 kubelet, 172.21.0.12  pulling image "nginx"
  Normal   Pulled             11m                 kubelet, 172.21.0.12  Successfully pulled image "nginx"
  Normal   Created            11m                 kubelet, 172.21.0.12  Created container
  Normal   Started            11m                 kubelet, 172.21.0.12  Started container
  Warning  MissingClusterDNS  62s (x12 over 12m)  kubelet, 172.21.0.12  pod: "test-schduler_default(213933b8-efda-11e9-9434-525400d54f7e)". kubelet does not have ClusterDNS IP configured and cannot create Pod using "ClusterFirst" policy. Falling back to "Default" policy.

而没有带有schedulerNamepod, 也就是test一直处于pending状态, 因为没有设置schedulerName的情况下默认使用k8s默认的调度器, 但是默认的调度器目前没有启动, 所以无法调度. 如果此时, 再启动一个不带config参数的调度器, 那就该pod就会被调度. 关于默认调度器可以参考 [k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler

2.2 源码分析

2.2.1 解析文件

其实该部分的源码大部分已经在[k8s源码分析][kube-scheduler]scheduler之启动run(1) 中已经分析了, 所以这里就尽量从简. 解析kube-scheduler中的config如下.

NewSchedulerCommand -> runCommand -> opts.Config() -> o.ApplyTo(c) 

所以最终是到这里会来加载config文件中的内容.

// cmd/kube-scheduler/app/options/options.go

func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
    // 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿
    if len(o.ConfigFile) == 0 {
        ...
    } else {
        // 如果kube-scheduler 指定了--config 那就会从配置文件中取
        cfg, err := loadConfigFromFile(o.ConfigFile)
        if err != nil {
            return err
        }

        // use the loaded config file only, with the exception of --address and --port. This means that
        // none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
        // behaviour of the flags we have to keep.
        c.ComponentConfig = *cfg

        if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
            return err
        }
    }
    ...
}

// cmd/kube-scheduler/app/options/configfile.go

func loadConfigFromFile(file string) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
    data, err := ioutil.ReadFile(file)
    if err != nil {
        return nil, err
    }

    return loadConfig(data)
}

func loadConfig(data []byte) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
    configObj := &kubeschedulerconfig.KubeSchedulerConfiguration{}
    if err := runtime.DecodeInto(kubeschedulerscheme.Codecs.UniversalDecoder(), data, configObj); err != nil {
        return nil, err
    }

    return configObj, nil
}

进而把config(schedulerConfig.yaml)中的内容转化成一个kubeschedulerconfig.KubeSchedulerConfiguration对象如下:

// pkg/scheduler/apis/config/types.go 

type KubeSchedulerConfiguration struct {
    metav1.TypeMeta
    // SchedulerName is name of the scheduler, used to select which pods
    // will be processed by this scheduler, based on pod's "spec.SchedulerName".
    SchedulerName string
    // AlgorithmSource specifies the scheduler algorithm source.
    AlgorithmSource SchedulerAlgorithmSource
    // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
    // corresponding to every RequiredDuringScheduling affinity rule.
    // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
    HardPodAffinitySymmetricWeight int32
    // 高可用的时候会分析
    LeaderElection KubeSchedulerLeaderElectionConfiguration
    ClientConnection apimachineryconfig.ClientConnectionConfiguration
    // defaulting to 0.0.0.0:10251
    HealthzBindAddress string
    // serve on, defaulting to 0.0.0.0:10251.
    MetricsBindAddress string
    apiserverconfig.DebuggingConfiguration
    // 是否禁止抢占
    DisablePreemption bool
    PercentageOfNodesToScore int32
    FailureDomains string
    BindTimeoutSeconds *int64
}

type SchedulerAlgorithmSource struct {
    // Policy 策略
    Policy *SchedulerPolicySource
    // Provider is the name of a scheduling algorithm provider to use.
    Provider *string
}
type SchedulerPolicySource struct {
    // 从文件中读
    File *SchedulerPolicyFileSource
    // 从configmap中读
    ConfigMap *SchedulerPolicyConfigMapSource
}

// 高可用
type KubeSchedulerLeaderElectionConfiguration struct {
    apiserverconfig.LeaderElectionConfiguration
    // LockObjectNamespace defines the namespace of the lock object
    LockObjectNamespace string
    // LockObjectName defines the lock object name
    LockObjectName string
}
type LeaderElectionConfiguration struct {
    LeaderElect bool
    LeaseDuration metav1.Duration
    RenewDeadline metav1.Duration
    RetryPeriod metav1.Duration
    ResourceLock string
}

// k8s.io/apimachinery/pkg/apis/meta/v1/types.go
type TypeMeta struct {
    Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
    APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`
}

可以看到c.ComponentConfig = *cfg就是这个schedulerConfig.yaml所转化的kubeschedulerconfig.KubeSchedulerConfiguration.

2.2.2 解析algorithmSource

接着就是

runCommand -> Run(cc, stopCh) -> scheduler.New```

注意: scheduler.New传进来的kubeschedulerconfig.SchedulerAlgorithmSource就是cc.ComponentConfig.AlgorithmSource也就是schedulerConfig.yaml中的algorithmSource.

// New returns a Scheduler
func New(client clientset.Interface,
    ...
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {
    ...
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // 默认调度器会进入到这里 *source.Provider = DefaultProvider
        ...
    case source.Policy != nil:
        // 自定义调度器会进入到这里
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    ...
}

可以到initPolicyFromFile方法根据source.Policy.File.Path也就是policy.yaml的路径读取内容并进行解析, 然后转化成schedulerapi.Policy对象.

type Policy struct {
    metav1.TypeMeta
    Predicates []PredicatePolicy
    Priorities []PriorityPolicy
    ExtenderConfigs []ExtenderConfig
    HardPodAffinitySymmetricWeight int32
    AlwaysCheckAllPredicates bool
}
type PredicatePolicy struct {
    Name string
    Argument *PredicateArgument
}
type PriorityPolicy struct {
    Name string
    Weight int
    Argument *PriorityArgument
}
type PredicateArgument struct {
    ServiceAffinity *ServiceAffinity
    LabelsPresence *LabelsPresence
}
type PriorityArgument struct {
    ServiceAntiAffinity *ServiceAntiAffinity
    LabelPreference *LabelPreference
    RequestedToCapacityRatioArguments *RequestedToCapacityRatioArguments
}
type ExtenderConfig struct {
    URLPrefix string
    FilterVerb string
    PreemptVerb string
    PrioritizeVerb string
    Weight int
    BindVerb string
    EnableHTTPS bool
    TLSConfig *restclient.TLSClientConfig
    HTTPTimeout time.Duration
    NodeCacheCapable bool
    ManagedResources []ExtenderManagedResource
    Ignorable bool
}

关于配置扩展方法的也列出来了, 不用多说, 结构体中的内容与yaml内容对应解析的.

2.2.3 根据policy生成factory.config
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
    klog.V(2).Infof("Creating scheduler from configuration: %v", policy)

    // validate the policy configuration
    if err := validation.ValidatePolicy(policy); err != nil {
        return nil, err
    }

    // 生成预选方法的key
    // 如果没有 就那默认的那些预选方法的key
    predicateKeys := sets.NewString()
    if policy.Predicates == nil {
        klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        predicateKeys = provider.FitPredicateKeys
    } else {
        for _, predicate := range policy.Predicates {
            klog.V(2).Infof("Registering predicate: %s", predicate.Name)
            predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
        }
    }

    // 生成优选方法的key
    // 如果没有 就那默认的那些优选方法的key
    priorityKeys := sets.NewString()
    if policy.Priorities == nil {
        klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
        provider, err := GetAlgorithmProvider(DefaultProvider)
        if err != nil {
            return nil, err
        }
        priorityKeys = provider.PriorityFunctionKeys
    } else {
        for _, priority := range policy.Priorities {
            klog.V(2).Infof("Registering priority: %s", priority.Name)
            priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
        }
    }

    // 生成扩展
    var extenders []algorithm.SchedulerExtender
    if len(policy.ExtenderConfigs) != 0 {
        ignoredExtendedResources := sets.NewString()
        for ii := range policy.ExtenderConfigs {
            klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
            extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
            if err != nil {
                return nil, err
            }
            extenders = append(extenders, extender)
            for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
                if r.IgnoredByScheduler {
                    ignoredExtendedResources.Insert(string(r.Name))
                }
            }
        }
        predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
    }
    // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
    // Give it higher precedence than scheduler CLI configuration when it is provided.
    if policy.HardPodAffinitySymmetricWeight != 0 {
        c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
    }
    // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
    // predicates even after one or more of them fails.
    if policy.AlwaysCheckAllPredicates {
        c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
    }

    // 根据预选, 优选, 扩展方法进行生成config
    return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}

CreateFromKeys[k8s源码分析][kube-scheduler]scheduler之启动run(1) 已经分析过了, 这个主要注重一下extenders.

func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
    klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)

    if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
        return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
    }

    // 根据当前的预选key得到所有的预选方法
    predicateFuncs, err := c.GetPredicates(predicateKeys)
    if err != nil {
        return nil, err
    }

    // 根据当前的优选key得到所有的优选方法
    priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
    if err != nil {
        return nil, err
    }

    // priorityMetaProducer 在算分的时候会用到
    priorityMetaProducer, err := c.GetPriorityMetadataProducer()
    if err != nil {
        return nil, err
    }
    // predicateMetaProducer 在真正预选的时候会用到
    predicateMetaProducer, err := c.GetPredicateMetadataProducer()
    if err != nil {
        return nil, err
    }

    // 是否打开了加速predicate的equivalence class cache
    // Init equivalence class cache
    if c.enableEquivalenceClassCache {
        c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
        klog.Info("Created equivalence class cache")
    }

    // 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.equivalencePodCache,
        c.podQueue,
        predicateFuncs,
        predicateMetaProducer,
        priorityConfigs,
        priorityMetaProducer,
        extenders,
        c.volumeBinder,
        c.pVCLister,
        c.pdbLister,
        c.alwaysCheckAllPredicates,
        c.disablePreemption,
        c.percentageOfNodesToScore,
    )
    ...
}

3. 总结

本文使用了一个例子来说明自定义调度器是如何使用, 比如可以自己确定使用哪些预选和优选方法, 后面会继续分析如何使用自己扩展的预选和优选方法.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352