1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析
kube-scheduler
的自定义调度器, 本文针对的是可以自己选择合适的预选和优选方法.
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 不带扩展方法
2.1 例子
集群安装可以参考k8s源码编译以及二进制安装(用于源码开发调试版).
2.2.1 准备配置文件
因为需要自定义预选和优选方法以及扩展方法, 所以肯定需要配置文件(
schedulerConfig.yaml
)
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
file:
path: policy.yaml
leaderElection:
leaderElect: true
lockObjectName: my-scheduler
lockObjectNamespace: kube-system
下面是
policy
文件.
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
}
2.1.2 运行以及测试
接着重新启动
kube-scheduler
用如下命令./kube-scheduler --master=http://localhost:8080 --config=schedulerConfig.yaml
. (关于schedulerConfig.yaml
和policy.yaml
自行修改就行)
接着部署一个带有
schedulerName
的pod
和一个不带有schedulerName
的pod
.
[root@master kubectl]# cat pod-scheduler.yaml
apiVersion: v1
kind: Pod
metadata:
name: test-schduler
spec:
schedulerName: my-scheduler
containers:
- name: podtest-scheduler
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# cat pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- name: podtest
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml
[root@master kubectl]# ./kubectl apply -f pod.yaml
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 0/1 Pending 0 83s
test-schduler 1/1 Running 0 13m
可以看到带有
schedulerName
的pod
, 也就是test-scheduler
已经成功运行.
[root@master kubectl]# ./kubectl describe pod test-scheduler
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 12m my-scheduler Successfully assigned default/test-schduler to 172.21.0.12
Normal Pulling 12m kubelet, 172.21.0.12 pulling image "nginx"
Normal Pulled 11m kubelet, 172.21.0.12 Successfully pulled image "nginx"
Normal Created 11m kubelet, 172.21.0.12 Created container
Normal Started 11m kubelet, 172.21.0.12 Started container
Warning MissingClusterDNS 62s (x12 over 12m) kubelet, 172.21.0.12 pod: "test-schduler_default(213933b8-efda-11e9-9434-525400d54f7e)". kubelet does not have ClusterDNS IP configured and cannot create Pod using "ClusterFirst" policy. Falling back to "Default" policy.
而没有带有
schedulerName
的pod
, 也就是test
一直处于pending
状态, 因为没有设置schedulerName
的情况下默认使用k8s
默认的调度器, 但是默认的调度器目前没有启动, 所以无法调度. 如果此时, 再启动一个不带config
参数的调度器, 那就该pod
就会被调度. 关于默认调度器可以参考 [k8s源码分析][kube-scheduler]scheduler/algorithmprovider之注册default-scheduler
2.2 源码分析
2.2.1 解析文件
其实该部分的源码大部分已经在[k8s源码分析][kube-scheduler]scheduler之启动run(1) 中已经分析了, 所以这里就尽量从简. 解析
kube-scheduler
中的config
如下.
NewSchedulerCommand -> runCommand -> opts.Config() -> o.ApplyTo(c)
所以最终是到这里会来加载
config
文件中的内容.
// cmd/kube-scheduler/app/options/options.go
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
// 如果kube-scheduler 没有指定--config 就是从默认配置(o.ComponentConfig)拿
if len(o.ConfigFile) == 0 {
...
} else {
// 如果kube-scheduler 指定了--config 那就会从配置文件中取
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
// use the loaded config file only, with the exception of --address and --port. This means that
// none of the deprectated flags in o.Deprecated are taken into consideration. This is the old
// behaviour of the flags we have to keep.
c.ComponentConfig = *cfg
if err := o.CombinedInsecureServing.ApplyToFromLoadedConfig(c, &c.ComponentConfig); err != nil {
return err
}
}
...
}
// cmd/kube-scheduler/app/options/configfile.go
func loadConfigFromFile(file string) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}
return loadConfig(data)
}
func loadConfig(data []byte) (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
configObj := &kubeschedulerconfig.KubeSchedulerConfiguration{}
if err := runtime.DecodeInto(kubeschedulerscheme.Codecs.UniversalDecoder(), data, configObj); err != nil {
return nil, err
}
return configObj, nil
}
进而把
config
(schedulerConfig.yaml
)中的内容转化成一个kubeschedulerconfig.KubeSchedulerConfiguration
对象如下:
// pkg/scheduler/apis/config/types.go
type KubeSchedulerConfiguration struct {
metav1.TypeMeta
// SchedulerName is name of the scheduler, used to select which pods
// will be processed by this scheduler, based on pod's "spec.SchedulerName".
SchedulerName string
// AlgorithmSource specifies the scheduler algorithm source.
AlgorithmSource SchedulerAlgorithmSource
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
HardPodAffinitySymmetricWeight int32
// 高可用的时候会分析
LeaderElection KubeSchedulerLeaderElectionConfiguration
ClientConnection apimachineryconfig.ClientConnectionConfiguration
// defaulting to 0.0.0.0:10251
HealthzBindAddress string
// serve on, defaulting to 0.0.0.0:10251.
MetricsBindAddress string
apiserverconfig.DebuggingConfiguration
// 是否禁止抢占
DisablePreemption bool
PercentageOfNodesToScore int32
FailureDomains string
BindTimeoutSeconds *int64
}
type SchedulerAlgorithmSource struct {
// Policy 策略
Policy *SchedulerPolicySource
// Provider is the name of a scheduling algorithm provider to use.
Provider *string
}
type SchedulerPolicySource struct {
// 从文件中读
File *SchedulerPolicyFileSource
// 从configmap中读
ConfigMap *SchedulerPolicyConfigMapSource
}
// 高可用
type KubeSchedulerLeaderElectionConfiguration struct {
apiserverconfig.LeaderElectionConfiguration
// LockObjectNamespace defines the namespace of the lock object
LockObjectNamespace string
// LockObjectName defines the lock object name
LockObjectName string
}
type LeaderElectionConfiguration struct {
LeaderElect bool
LeaseDuration metav1.Duration
RenewDeadline metav1.Duration
RetryPeriod metav1.Duration
ResourceLock string
}
// k8s.io/apimachinery/pkg/apis/meta/v1/types.go
type TypeMeta struct {
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`
}
可以看到
c.ComponentConfig = *cfg
就是这个schedulerConfig.yaml
所转化的kubeschedulerconfig.KubeSchedulerConfiguration
.
2.2.2 解析algorithmSource
接着就是
runCommand -> Run(cc, stopCh) -> scheduler.New```
注意:
scheduler.New
传进来的kubeschedulerconfig.SchedulerAlgorithmSource
就是cc.ComponentConfig.AlgorithmSource
也就是schedulerConfig.yaml
中的algorithmSource
.
// New returns a Scheduler
func New(client clientset.Interface,
...
schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
stopCh <-chan struct{},
opts ...func(o *schedulerOptions)) (*Scheduler, error) {
...
var config *factory.Config
source := schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 默认调度器会进入到这里 *source.Provider = DefaultProvider
...
case source.Policy != nil:
// 自定义调度器会进入到这里
// Create the config from a user specified policy source.
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
sc, err := configurator.CreateFromConfig(*policy)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
}
config = sc
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
...
}
可以到
initPolicyFromFile
方法根据source.Policy.File.Path
也就是policy.yaml
的路径读取内容并进行解析, 然后转化成schedulerapi.Policy
对象.
type Policy struct {
metav1.TypeMeta
Predicates []PredicatePolicy
Priorities []PriorityPolicy
ExtenderConfigs []ExtenderConfig
HardPodAffinitySymmetricWeight int32
AlwaysCheckAllPredicates bool
}
type PredicatePolicy struct {
Name string
Argument *PredicateArgument
}
type PriorityPolicy struct {
Name string
Weight int
Argument *PriorityArgument
}
type PredicateArgument struct {
ServiceAffinity *ServiceAffinity
LabelsPresence *LabelsPresence
}
type PriorityArgument struct {
ServiceAntiAffinity *ServiceAntiAffinity
LabelPreference *LabelPreference
RequestedToCapacityRatioArguments *RequestedToCapacityRatioArguments
}
type ExtenderConfig struct {
URLPrefix string
FilterVerb string
PreemptVerb string
PrioritizeVerb string
Weight int
BindVerb string
EnableHTTPS bool
TLSConfig *restclient.TLSClientConfig
HTTPTimeout time.Duration
NodeCacheCapable bool
ManagedResources []ExtenderManagedResource
Ignorable bool
}
关于配置扩展方法的也列出来了, 不用多说, 结构体中的内容与
yaml
内容对应解析的.
2.2.3 根据policy生成factory.config
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}
// 生成预选方法的key
// 如果没有 就那默认的那些预选方法的key
predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", DefaultProvider)
provider, err := GetAlgorithmProvider(DefaultProvider)
if err != nil {
return nil, err
}
predicateKeys = provider.FitPredicateKeys
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomFitPredicate(predicate))
}
}
// 生成优选方法的key
// 如果没有 就那默认的那些优选方法的key
priorityKeys := sets.NewString()
if policy.Priorities == nil {
klog.V(2).Infof("Using priorities from algorithm provider '%v'", DefaultProvider)
provider, err := GetAlgorithmProvider(DefaultProvider)
if err != nil {
return nil, err
}
priorityKeys = provider.PriorityFunctionKeys
} else {
for _, priority := range policy.Priorities {
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriorityFunction(priority))
}
}
// 生成扩展
var extenders []algorithm.SchedulerExtender
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
for ii := range policy.ExtenderConfigs {
klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
if err != nil {
return nil, err
}
extenders = append(extenders, extender)
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
// 根据预选, 优选, 扩展方法进行生成config
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}
CreateFromKeys
在[k8s源码分析][kube-scheduler]scheduler之启动run(1) 已经分析过了, 这个主要注重一下extenders
.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.GetHardPodAffinitySymmetricWeight() < 1 || c.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.GetHardPodAffinitySymmetricWeight())
}
// 根据当前的预选key得到所有的预选方法
predicateFuncs, err := c.GetPredicates(predicateKeys)
if err != nil {
return nil, err
}
// 根据当前的优选key得到所有的优选方法
priorityConfigs, err := c.GetPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
// priorityMetaProducer 在算分的时候会用到
priorityMetaProducer, err := c.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}
// predicateMetaProducer 在真正预选的时候会用到
predicateMetaProducer, err := c.GetPredicateMetadataProducer()
if err != nil {
return nil, err
}
// 是否打开了加速predicate的equivalence class cache
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
klog.Info("Created equivalence class cache")
}
// 生成真正进行调度计算的Algorithm algorithm.ScheduleAlgorithm
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
extenders,
c.volumeBinder,
c.pVCLister,
c.pdbLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)
...
}
3. 总结
本文使用了一个例子来说明自定义调度器是如何使用, 比如可以自己确定使用哪些预选和优选方法, 后面会继续分析如何使用自己扩展的预选和优选方法.