1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析
kube-scheduler
的自定义调度器, 主要是研究带有扩展方法的自定义调度器.
源码位置: https://github.com/nicktming/kubernetes
分支: tming-v1.13 (基于v1.13版本)
2. 例子
2.1 准备工作
2.1.1 启动带配置文件的kube-scheduler服务
在某一台机器(
worker(172.21.0.12)
)运行带有配置的kube-scheduler
.
schedulerConfig.yaml
文件
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
schedulerName: my-scheduler
algorithmSource:
policy:
file:
path: policy.yaml
leaderElection:
leaderElect: true
lockObjectName: my-scheduler
lockObjectNamespace: kube-system
policy.yaml
文件
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : 1},
{"name" : "BalancedResourceAllocation", "weight" : 1},
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"extenders" : [{
"urlPrefix": "http://localhost/scheduler",
"filterVerb": "predicates/always_true",
"prioritizeVerb": "priorities/zero_score",
"preemptVerb": "preemption",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
"nodeCacheCapable": false
}],
"hardPodAffinitySymmetricWeight" : 10
}
运行命令
./kube-scheduler --master=http://172.21.0.16:8080 --config=schedulerConfig.yaml
.
请注意
urlPrefix
是你的自定义kube-scheduler
要访问你的extender
服务的地址, 如果我把这两个服务部署在同一机器上, 所以用的是"urlPrefix": "http://localhost/scheduler"
2.1.2 启动extender服务
在某一台机器(
worker(172.21.0.12)
)上运行该代码k8s-scheduler-extender-example, 下载到机器.
[root@worker nicktming]# pwd
/root/go/src/github.com/nicktming
[root@worker nicktming]# git clone https://github.com/nicktming/k8s-scheduler-extender-example.git
[root@worker nicktming]# cd k8s-scheduler-extender-example
[root@worker k8s-scheduler-extender-example]# go build .
// 如果没有go环境 可以直接用代码中已经编译好的二进制文件k8s-scheduler-extender-example
[root@worker k8s-scheduler-extender-example]# ./k8s-scheduler-extender-example
关于这个
extender
服务, 就是一个API
服务,kube-scheduelr
会根据配置文件来向这个extender
发送请求.
以预选方法为例
请求:
kube-scheduler
会发送一个schedulerapi.ExtenderArgs
类型的对象给extender
,该对象会包含一些节点和一个pod
.
type ExtenderArgs struct {
// 正在被调度的pod
Pod *v1.Pod
// 节点信息
Nodes *v1.NodeList
NodeNames *[]string
}
返回:
extender
处理结束后需要返回一个schedulerapi.ExtenderFilterResult
对象来告诉kube-scheduler
该pod
在哪些节点是可以运行的, 哪些节点是不可以运行的.
type ExtenderFilterResult struct {
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == false
Nodes *v1.NodeList
// Filtered set of nodes where the pod can be scheduled; to be populated
// only if ExtenderConfig.NodeCacheCapable == true
NodeNames *[]string
// Filtered out nodes where the pod can't be scheduled and the failure messages
FailedNodes FailedNodesMap
// 错误信息
Error string
}
extender
服务中Handler
接受一个schedulerapi.ExtenderArgs
的对象, 处理结束后返回schedulerapi.ExtenderFilterResult
类型的对象给kube-scheduler
.
// main.go
TruePredicate = Predicate{
Name: "always_true",
Func: func(pod v1.Pod, node v1.Node) (bool, error) {
if node.Name == "172.21.0.16" {
return false, nil
}
return true, nil
},
}
// predicate.go
type Predicate struct {
Name string
Func func(pod v1.Pod, node v1.Node) (bool, error)
}
func (p Predicate) Handler(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
pod := args.Pod
canSchedule := make([]v1.Node, 0, len(args.Nodes.Items))
canNotSchedule := make(map[string]string)
for _, node := range args.Nodes.Items {
// 调用自己的处理逻辑方法 判断该pod可不可以在该节点上运行
result, err := p.Func(*pod, node)
fmt.Printf("===>extender node:%v, result:%v\n", node.Name, result)
if err != nil {
canNotSchedule[node.Name] = err.Error()
} else {
if result {
canSchedule = append(canSchedule, node)
}
}
}
// 返回值
result := schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{
Items: canSchedule,
},
FailedNodes: canNotSchedule,
Error: "",
}
return &result
}
kube-scheduler
会调用配置文件中policy.yaml
(在下文中)中的urlPrefix+filterVerb
也就是http://localhost/scheduler/predicates/always_true
进到上面的Handler
方法并且调用always_true
的这个filter
方法.
具体
extender
服务的调用逻辑可以自己看一下, 比较简单就不多说了.
2.2 验证
2.2.1 创建带有schedulerName: my-scheduler
的pod
.
[root@master kubectl]# cat pod-scheduler.yaml
apiVersion: v1
kind: Pod
metadata:
name: test-schduler
spec:
schedulerName: my-scheduler
containers:
- name: podtest-scheduler
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod-scheduler.yaml
pod/test-schduler created
[root@master kubectl]#
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test-schduler 1/1 Running 0 17s
[root@master kubectl]# ./kubectl get pods test-schduler -o yaml | grep -i nodeName
nodeName: 172.21.0.12
[root@master kubectl]# ./kubectl get pods test-schduler -o yaml | grep -i schedulerName
{"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{},"name":"test-schduler","namespace":"default"},"spec":{"containers":[{"image":"nginx","name":"podtest-scheduler","ports":[{"containerPort":80}]}],"schedulerName":"my-scheduler"}}
schedulerName: my-scheduler
[root@master kubectl]#
同时查看
extender
服务的日志:
[root@worker k8s-scheduler-extender-example]# ./k8s-scheduler-extender-example
[ warn ] 2019/10/16 16:16:50 main.go:87: LOG_LEVEL="" is empty or invalid, fallling back to "INFO".
[ info ] 2019/10/16 16:16:50 main.go:101: Log level was set to INFO
[ info ] 2019/10/16 16:16:50 main.go:119: server starting on the port :80
[ info ] 2019/10/16 16:19:51 routes.go:29: always_true ExtenderArgs =
===>extender node:172.21.0.16, result:false
===>extender node:172.21.0.12, result:true
[ info ] 2019/10/16 16:19:51 routes.go:49: always_true extenderFilterResult = {"Nodes":{"metadata":{},"items":[{"metadata":{"name":"172.21.0.12","selfLink":"/api/v1/nodes/172.21.0.12","uid":"ec8685f1-ef5f-11e9-8482-525400d54f7e","resourceVersion":"23957","creationTimestamp":"2019-10-15T15:24:48Z","labels":{"beta.kubernetes.io/arch":"amd64","beta.kubernetes.io/os":"linux","kubernetes.io/hostname":"172.21.0.12"},"annotations":{"node.alpha.kubernetes.io/ttl":"0","volumes.kubernetes.io/controller-managed-attach-detach":"true"}},"spec":{},"status":{"capacity":{"cpu":"2","ephemeral-storage":"51473888Ki","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"3880944Ki","pods":"110"},"allocatable":{"cpu":"2","ephemeral-storage":"47438335103","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"3778544Ki","pods":"110"},"conditions":[{"type":"MemoryPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasSufficientMemory","message":"kubelet has sufficient memory available"},{"type":"DiskPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasNoDiskPressure","message":"kubelet has no disk pressure"},{"type":"PIDPressure","status":"False","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletHasSufficientPID","message":"kubelet has sufficient PID available"},{"type":"Ready","status":"True","lastHeartbeatTime":"2019-10-16T08:19:49Z","lastTransitionTime":"2019-10-16T04:15:19Z","reason":"KubeletReady","message":"kubelet is posting ready status"}],"addresses":[{"type":"InternalIP","address":"172.21.0.12"},{"type":"Hostname","address":"172.21.0.12"}],"daemonEndpoints":{"kubeletEndpoint":{"Port":10250}},"nodeInfo":{"machineID":"c28d40cbc8e3adcb4e32d9779a77b39e","systemUUID":"2C6B0169-85AC-48F3-9377-35EFC668E23C","bootID":"f5081260-8e17-446c-9b2c-8c2766e49e0e","kernelVersion":"3.10.0-862.el7.x86_64","osImage":"CentOS Linux 7 (Core)","containerRuntimeVersion":"docker://17.9.1","kubeletVersion":"v0.0.0-master+$Format:%h$","kubeProxyVersion":"v0.0.0-master+$Format:%h$","operatingSystem":"linux","architecture":"amd64"},"images":[{"names":["nginx@sha256:aeded0f2a861747f43a01cf1018cf9efe2bdd02afd57d2b11fcc7fcadc16ccd1","nginx:latest"],"sizeBytes":125952483},{"names":["mirrorgooglecontainers/pause@sha256:59eec8837a4d942cc19a52b8c09ea75121acc38114a2c68b98983ce9356b8610","mirrorgooglecontainers/pause:3.1"],"sizeBytes":742472},{"names":["hello-world@sha256:c3b4ada4687bbaa170745b3e4dd8ac3f194ca95b2d0518b417fb47e5879d9b5f","hello-world:latest"],"sizeBytes":1840}]}}]},"NodeNames":null,"FailedNodes":{},"Error":""}
从日志中可以看到
kube-scheduler
已经调用了extender
服务, 并且没有把test-schduler
这个pod
部署在172.21.0.16
这个节点上.
2.2.2 创建不带有schedulerName
的pod
.
可以看到该
pod
是使用默认调度器调度的, 也可以创建成功.
root@master kubectl]# cat pod.yaml
apiVersion: v1
kind: Pod
metadata:
name: test
spec:
containers:
- name: podtest
image: nginx
ports:
- containerPort: 80
[root@master kubectl]# ./kubectl apply -f pod.yaml
[root@master kubectl]# ./kubectl get pods
NAME READY STATUS RESTARTS AGE
test 1/1 Running 0 2m39s
test-schduler 1/1 Running 0 24m
[root@master kubectl]# ./kubectl get pod test -o yaml | grep -i nodeName
nodeName: 172.21.0.16
[root@master kubectl]# ./kubectl get pod test -o yaml | grep -i schedulerName
schedulerName: default-scheduler
[root@master kubectl]#
3. 相关源码部分分析
在 [k8s源码分析][kube-scheduler]scheduler之自定义调度器(1) 中已经分析了
kube-scheduler --config
加载配置文件的流程. 所以这里就着重分析与extender
服务交互的部分.
在
CreateFromConfig
中可以看到extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
就是根据plicy.yaml
中的第ii
个extenders
的配置生成对应的extender
.
// pkg/scheduler/factory/factory.go
// Creates a scheduler from the configuration file
func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*Config, error) {
// 生成扩展
var extenders []algorithm.SchedulerExtender
if len(policy.ExtenderConfigs) != 0 {
ignoredExtendedResources := sets.NewString()
for ii := range policy.ExtenderConfigs {
klog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
if err != nil {
return nil, err
}
extenders = append(extenders, extender)
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources.Insert(string(r.Name))
}
}
}
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
}
}
3.1 extender接口
type SchedulerExtender interface {
// Name returns a unique name that identifies the extender.
// 该extender的名字 onfig.URLPrefix
Name() string
// 过滤方法 也就是相当于预选方法
Filter(pod *v1.Pod,
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error)
// 打分
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error)
// Bind delegates the action of binding a pod to a node to the extender.
Bind(binding *v1.Binding) error
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
// 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
// The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*schedulerapi.Victims,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
) (map[*v1.Node]*schedulerapi.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not.
// 是否支持抢占
SupportsPreemption() bool
// 是否可以容忍错误
// 设置为true时 如果该extender执行过程中发生了错误 可以容忍 就是直接跳过
// 设置为false时 如果该extender执行过程中发生了错误 那scheduler就会返回了
IsIgnorable() bool
}
3.1 extender接口实现类HTTPExtender
根据
schedulerapi.ExtenderConfig
生成一个HTTPExtender
, 包括创建客户端与extender
进行交互.
// HTTPExtender implements the algorithm.SchedulerExtender interface.
type HTTPExtender struct {
extenderURL string
preemptVerb string
filterVerb string
prioritizeVerb string
bindVerb string
weight int
client *http.Client
nodeCacheCapable bool
managedResources sets.String
ignorable bool
}
func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) {
if config.HTTPTimeout.Nanoseconds() == 0 {
config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
}
transport, err := makeTransport(config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.HTTPTimeout,
}
managedResources := sets.NewString()
for _, r := range config.ManagedResources {
managedResources.Insert(string(r.Name))
}
return &HTTPExtender{
extenderURL: config.URLPrefix,
preemptVerb: config.PreemptVerb,
filterVerb: config.FilterVerb,
prioritizeVerb: config.PrioritizeVerb,
bindVerb: config.BindVerb,
weight: config.Weight,
client: client,
nodeCacheCapable: config.NodeCacheCapable,
managedResources: managedResources,
ignorable: config.Ignorable,
}, nil
}
3.4 与extender交互
因为都是大同小异, 所以就以预选阶段为例看一下.
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
...
// 对预选方法过滤出来的所有节点 再重新从extenders中一个个过滤
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
// 如果出现失败 返回
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}
for failedNodeName, failedMsg := range failedMap {
// 如果failedPredicateMap中不存在 加入到failedPredicateMap中
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
// 返回最终的filtered 适合的节点
// failedPredicateMap 失败的节点以及原因
return filtered, failedPredicateMap, nil
}
可以看到在预选阶段过滤出一部分节点之后, 又调用
extenders
中每个extender
过滤一遍, 每个extender
调用自己的Filter
方法进行过滤.
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo,
) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
var (
result schedulerapi.ExtenderFilterResult
nodeList *v1.NodeList
nodeNames *[]string
nodeResult []*v1.Node
args *schedulerapi.ExtenderArgs
)
if h.filterVerb == "" {
return nodes, schedulerapi.FailedNodesMap{}, nil
}
// 如果nodeCacheCapable等于true 则使用nodeNames
// 否则使用nodeList
if h.nodeCacheCapable {
nodeNameSlice := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNameSlice = append(nodeNameSlice, node.Name)
}
nodeNames = &nodeNameSlice
} else {
nodeList = &v1.NodeList{}
for _, node := range nodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
// 组装发送给extender服务的ExtenderArgs
args = &schedulerapi.ExtenderArgs{
Pod: pod,
Nodes: nodeList,
NodeNames: nodeNames,
}
// 给其对应的api发POST请求
if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
}
// 取结果
if h.nodeCacheCapable && result.NodeNames != nil {
nodeResult = make([]*v1.Node, 0, len(*result.NodeNames))
for i := range *result.NodeNames {
nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node())
}
} else if result.Nodes != nil {
nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items))
for i := range result.Nodes.Items {
nodeResult = append(nodeResult, &result.Nodes.Items[i])
}
}
return nodeResult, result.FailedNodes, nil
}
Filter
方法给该extender
对应的filterVerb
(也就是请求路径(predicates/always_true
), 加上该extender
的extenderURL=http://localhost/scheduler
, 所以全局路径http://localhost/scheduler/predicates/always_true
)发送请求, 这个就回到了前面分析extender
服务接受到请求然后去Handler
方法中进行处理. 然后把结果符合给Filter
方法存到result
中.
4. 总结
可以看到 k8s-scheduler-extender-example 中是以
configMap
和Deployment
文件的形式部署一个调度器, 可以看到有两个容器.
第一个: 为
gcr.io/google_containers/hyperkube:v1.11.1
, 这个镜像里面有编译的k8s
的各个组件, 所以该容器my-scheduler-ctr
运行的是kube-scheduler
组件, 并且以configmap
的形式把配置文件加载进去, 然后后面的运行就跟[k8s源码分析][kube-scheduler]scheduler之自定义调度器(1) 分析的一样了.
第二个: 容器my-scheduler-extender-ctr
就是该extender
服务, 只不过打成镜像了而已.
这两个容器在一个pod
, 所以相互直接用localhost
访问即可.
containers:
- name: my-scheduler-ctr
image: gcr.io/google_containers/hyperkube:v1.11.1
imagePullPolicy: IfNotPresent
args:
- kube-scheduler
- --config=/my-scheduler/config.yaml
- -v=4
volumeMounts:
- name: my-scheduler-config
mountPath: /my-scheduler
- name: my-scheduler-extender-ctr
image: a/b:c
imagePullPolicy: IfNotPresent
所以这样就以插件的形式部署到
k8s
集群上了.