5.1 优选阶段
本文将承接上文[k8s源码分析][kube-scheduler]scheduler之调度之预选继续分析.
func PrioritizeNodes(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
meta interface{},
priorityConfigs []algorithm.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
// 如果没有优选方法并且也没有扩展方法 则所有节点都是得分为1分
if len(priorityConfigs) == 0 && len(extenders) == 0 {
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
result = append(result, hostPriority)
}
return result, nil
}
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)
appendError := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
// 旧版本的算分
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
wg.Add(1)
go func(index int) {
defer wg.Done()
var err error
results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}(i)
} else {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
}
var err error
// 进行map方法算分
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Host = nodes[index].Name
}
}
})
for i := range priorityConfigs {
if priorityConfigs[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
defer wg.Done()
// 进行map方法算分
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
}
}
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}
// 如果有扩展器, 就将控制器中的方法计算一次, 把分数加起来
if len(extenders) != 0 && nodes != nil {
combinedScores := make(map[string]int, len(nodeNameToInfo))
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
go func(extIndex int) {
defer wg.Done()
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
return
}
mu.Lock()
for i := range *prioritizedList {
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
if klog.V(10) {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
}
combinedScores[host] += score * weight
}
mu.Unlock()
}(i)
}
// wait for all go routines to finish
wg.Wait()
for i := range result {
result[i].Score += combinedScores[result[i].Host]
}
}
if klog.V(10) {
for i := range result {
klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
}
}
return result, nil
}
这里比较简单, 就是调用所有的优选方法计算出所有节点所获得的分数. 如果没有优选方法, 则每个节点都是
1
分, 然后最终该调度器会这些节点中任意选一个.
5.2 选择阶段
选择阶段主要是调用
selectHost
来从优选中选择最高分的节点, 如果最高分有多个节点, 那么会从这些节点中任意选择一个.
func findMaxScores(priorityList schedulerapi.HostPriorityList) []int {
maxScoreIndexes := make([]int, 0, len(priorityList)/2)
maxScore := priorityList[0].Score
for i, hp := range priorityList {
if hp.Score > maxScore {
maxScore = hp.Score
maxScoreIndexes = maxScoreIndexes[:0]
maxScoreIndexes = append(maxScoreIndexes, i)
} else if hp.Score == maxScore {
maxScoreIndexes = append(maxScoreIndexes, i)
}
}
return maxScoreIndexes
}
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
if len(priorityList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
maxScores := findMaxScores(priorityList)
ix := int(g.lastNodeIndex % uint64(len(maxScores)))
g.lastNodeIndex++
return priorityList[maxScores[ix]].Host, nil
}
findMaxScores: 获得最高分节点的下标, 是一个数组.
selectHost: 从findMaxScores
返回中选择一个.
5.3 抢占
如果正常调度无法为当前
pod
分配一个节点, 那么就会尝试进行抢占(Preempt
)操作.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
// 判断该pod能不能够进行抢占
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// 选一些潜在的节点
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
// 发生抢占
// nodeToVictims返回值为 所有可以发生抢占行为而能够让该pod运行的节点 以及每个节点对应的需要删除的pods
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
// We will only check nodeToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
// 如果有扩展项 则调用扩展项进行计算
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
// 从这些nodeToVictims中选出一个最终发生抢占的节点candidateNode
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, err
}
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
// 那些优先级比当前pod优先级低的nomiated pods有可能应该不适合再运行在candidateNode节点上了
// 所以需要把这些nominatedPods找到
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
这里流程比较多. 这里简单说明, 在后面单独分析每个方法.
1. 调用podEligibleToPreemptOthers
方法判断当前pod
可不可以发生抢占行为.
2. 调用nodesWherePreemptionMightHelp
方法选一些潜在的节点.
3. 调用selectNodesForPreemption
方法进行抢占, 这里会返nodeToVictims
, 代表那些发生抢占行为能够运行该pod
的节点, 以及每个节点对应的需要删除的pods
.
4. 调用processPreemptionWithExtenders
方法在nodeToVictims
的基础上在扩展项中再次过滤. 在默认调度器中没有任何扩展项, 在自定义调度器中有可能会有扩展项.
5. 调用pickOneNodeForPreemption
方法从nodeToVictims
选出一个最终发生抢占的节点(candidateNode
).
6. 调用getLowerPriorityNominatedPods
方法来获得在candidateNode
节点上比当前pod
优先级低的nominated pods
. 因为这些nominated pods
在该pod
成为nominated pod
后有可能是无法运行在candidateNode
节点上的, 所以需要从scheduling_queue
中删除这些nominated pods
.
5.3.1 podEligibleToPreemptOthers
返回
true
, 表明该pod
可以去尝试抢占
返回false
, 表明该pod shouldn't be considered for preemption
.
// pkg/scheduler/core/generic_scheduler.go
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.
return false
}
}
}
}
return true
}
1. 如果该
pod
之前没有抢占过, 则直接返回true
.
2. 如果该
pod
以前发生抢占过(pod.Status.NominatedNodeName
有值),
只要该nominated node
中有任何优先级比当前pod
低的pod
正在做删除操作, 那就返回false
. (因为说明该nominated node
正在为这个nominated pod
(就是当前pod
)进行删除那些被确定为牺牲者的pods
). 不然还是返回true
.
5.3.2 nodesWherePreemptionMightHelp
这里主要考虑的是该
pod
在预选阶段有些节点因为某些错误不能调度, 那同样的在抢占阶段这些节点同样还是会因为这些原因而无法运行该pod
. 这些错误包括ErrNodeSelectorNotMatch
等等.
所以因为这些错误无法调度
pod
的节点不会成为潜在的抢占节点, 反之不是因为这些错误的节点会成为潜在的抢占节点.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
potentialNodes := []*v1.Node{}
for _, node := range nodes {
unresolvableReasonExist := false
failedPredicates, _ := failedPredicatesMap[node.Name]
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodAffinityRulesNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnderDiskPressure,
predicates.ErrNodeUnderPIDPressure,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeOutOfDisk,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition,
predicates.ErrVolumeZoneConflict,
predicates.ErrVolumeNodeConflict,
predicates.ErrVolumeBindConflict:
unresolvableReasonExist = true
break
}
}
if !unresolvableReasonExist {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
5.3.3 selectNodesForPreemption
这里启动了
16
个goroutine
并发执行selectVictimsOnNode
, 该方法selectVictimsOnNode
会返回传入的节点是否fits
, 如果可以, 就会把返回的要删除的pods
和pdbViolations
的个数. 然后加入到nodeToVictims
中.
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
Pods: pods,
NumPDBViolations: numPDBViolations,
}
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
// nodeToVictims 记录着这些可以运行该pod的节点以及其需要删除的pods
return nodeToVictims, nil
}
5.3.3.1 selectVictimsOnNode
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
// 准备潜在要删除的pods
// 潜在要删除的pods: 当然是那些优先级比当前pod优先级低的pods
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
// 将这些潜在要删除的pods按照优先级要高从低排序
potentialVictims.Sort()
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption. The only condition
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
// 将这些潜在的pods全部删除如果还不能让该pod运行到该节点上的话 就可以直接返回了
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
// violating victims and then other non-violating ones. In both cases, we start
// from the highest priority victims.
// 按照pdbs把潜在要删除的pods分成两类 violatingVictims和nonViolatingVictims
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
// 尝试把该p(pod)加回到节点中 其实说不删除p
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits {
// p需要删除
removePod(p)
victims = append(victims, p)
klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
// 无论violatingVictims还是nonViolatingVictims 但是按优先级从高到低依次尝试reprievePod
// reprievePod的意思就是说 把p加回去(就是不删除) 如果不满足运行的话 那p就会真正加到victims中 也就是要删除
// 如果满足运行的话 那p就不用删除了
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
// Now we try to reprieve non-violating victims.
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true
}
这个算法的意思:
1. 找到该节点潜在要删除的pods
, 就是那些优先级比当前pod
优先级低的pods
. 并且将这些potentialVictims
先从nodeInfo
中删除.
2. 将这些潜在要删除的pods
按照优先级要高从低排序.
3. 将排好序的potentialVictims
一个个尝试加回去看看是否可以满足. 那第一个尝试的就是优先级最高的那个, 如果该pod
加回去可以满足的话, 那这次尝试的意思是其余那些potentialVictims
将会删除.
用个例子来说明吧:
5.3.4 processPreemptionWithExtenders
这里就不多说了, 就是将返回
nodeToVictims
再按扩展项部分过滤一次.
5.3.5 pickOneNodeForPreemption
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
// We found a node that doesn't need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption.
// 如果不需要删除任何pod 就直接返回
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// There are more than one node with minimum number PDB violating pods. Find
// the one with minimum highest priority victim.
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
lenNodes2 = 0
}
if highestPodPriority == minHighestPriority {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
if lenNodes2 == 1 {
return minNodes2[0]
}
// There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
for _, pod := range nodesToVictims[node].Pods {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods.
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// At this point, even if there are more than one node with the same score,
// return the first one.
if lenNodes2 > 0 {
return minNodes2[0]
}
klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil
}
1. 选择那些具有最少
PDB violations
的节点, 如果只有一个, 直接返回. (A node with minimum number of PDB violations.
)
2. 在1.的基础上, 选择minimum highest priority victim
. (A node with minimum highest priority victim is picked.
)
3. 在2.的基础上选择那些牺牲者优选分数最少的节点, 如果只有一个, 直接返回.(Ties are broken by sum of priorities of all victims.
)
4. 在3.的基础上选择牺牲者个数最少的节点, 如果只有一个, 直接返回.(node with the minimum number of victims is picked.
)
5. 在4.的基础上选择第一个.(the first such node is picked (sort of randomly).
)
5.3.6 getLowerPriorityNominatedPods
那些优先级比当前
pod
优先级低的nomiated pods
有可能应该不适合再运行在candidateNode
节点上了
所以需要把这些nominatedPods
找到
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
if len(pods) == 0 {
return nil
}
var lowerPriorityPods []*v1.Pod
podPriority := util.GetPodPriority(pod)
for _, p := range pods {
if util.GetPodPriority(p) < podPriority {
lowerPriorityPods = append(lowerPriorityPods, p)
}
}
return lowerPriorityPods
}