k8s调度器扩展

调度需求

Gang调度(pod group):部署一个工作负载时,满足一定数量的pod能调度成功,即可以正常调度,不然全部调度失败
-- 例如:模型训练时,算法要求至少运行8个任务并行作业才可以正常训练。当资源不足够时只能运行6个任务时,则希望这6个任务也不要运行(浪费计算资源)。
装箱调度
-- 减少对GPU卡的碎片化
-- 减少GPU卡的碎片化作用比较多:1. 资源的更高效利用,节约成本 2. vnlink模式的gpu,在同一台机器上有更高的性能
-- 用例:假设node的空闲的卡数为m,pod request的卡数为n,m - n的值越小,score值越高(m - n最小为0,n > m的情况会在filter直接过滤掉)
rescheduler
-- 用例:集群负载拓扑node1(podA(4卡), podB(3卡)),node2()。开始调度podC(4卡) -> 触发rescheduler,node1(podA(4卡), podC(4卡)) node2(podA(3卡))
app dependence:可以配置应用调度的依赖关系
-- 用例: 模型训练时,任务1运行生成结果A,任务2运行依赖A
节点池:可以将集群中的节点资源划分成不同的节点池
-- 场景1:部分节点作为节点池分给独占租户使用
-- 场景2:节点的硬件质量、厂家,用途等不同,可以划分成不同的节点池。例如:旧机器划分成一个节点池,新机器划分到一个节点池,想要进行机器更换或者迁移时可以做到更高的可控性
自定义调度算法:用户在在filter、score等扩展点根据业务规则自定义算法
-- 用例:最优线路调度 slurm on k8s调度方案
-- 用户可以自定义扩展算法对节点进行调度,例如:slurm on k8s的方案,考虑到最好的性能,希望slurm cluster最好运行在一个leaf Pod网络内,且尽可能占用最少的leaf Pod数,对leaf Pod网络拓扑碎片化的影响最小
多调度策略:集群支持配置多调度策略,业务可以选择适合的调度策略进行调度
-- 例如: GPU服务集群(如slurm on k8s)调度时对pod之间最优路径有需求走自己的调度器,其他的CPU服务调度时走k8s默认调度器
-- 可以考虑在同一个调度器内根据pod标签进行逻辑划分,这样的设计功能比较耦合,对平台的高可用性也不太友好。
-- 可以考虑使用多调度器(调度器就是插件的组合),调度器名称的配置交给应用还是webhook实现或者其他方式需要考虑。
-- 同一个VC是同一类应用,走同一个调度策略?需要根据当前VC的业务场景确定调度策略
并发调度
-- 通过并发调度提效
-- k8s默认调度器是串行进行调度,支持配置多调度器,但是在资源紧张的情况下,由于不同的调度器之间的资源同步延迟,可能导致资源冲突
-- 对集群资源进行划分,走不通的调度器进行并发调度时则不会导致资源冲突

k8s默认调度器

image.png

核心流程:

Predicates: 预选,按照调度策略,从当前集群的所有节点中,“过滤”出一系列符合条件的节点。这些节点,都是可以运行待调度 Pod 的宿主机
Priorities: 优选,对预选完成的节点打分。这里打分的范围是 0-10 分,得分最高的节点就是最后被 Pod 绑定的最佳节点

k8s默认调度器扩展

一、扩展点介绍

k8s默认调度器是通过scheduler framework实现的,scheduler framework预留了一些扩展点,可以通过scheduler framework扩展点对调度器进行扩展


image.png

二、对扩展点进行扩展

package networkspeed

import (
    "context"
    "errors"
    "fmt"
    "os"
    "slices"
    "strconv"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/klog/v2"
    "k8s.io/kubernetes/pkg/scheduler/framework"
    frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)

const PluginName = "NetworkSpeed"

type Config struct {
    Selector  map[string]string `json:"selector"`
    Namespace string            `json:"namespace"`
    Timeout   int64             `json:"timeout"`
    Port      int               `json:"port"`
}

type NetworkSpeedPlugin struct {
    config *Config
    handle framework.Handle
}

func (n *NetworkSpeedPlugin) Name() string {
    return PluginName
}

func (n *NetworkSpeedPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
    target, module, ok := getProbeConfig(p)
    if !ok {
        return 0, nil
    }

    duration, err := n.doProbe(ctx, nodeName, target, module)
    if err != nil {
        klog.Error("doProbe error", "err", err.Error())
        return 0, nil
    }

    klog.InfoS(
        "doProbe",
        "node", nodeName,
        "target", target,
        "pod", p.Namespace+"/"+p.Name,
        "duration", duration,
    )
    annos := p.GetAnnotations()
    klog.Infof("annos: %v", annos)
    k := annos[nodeName]
    klog.Infof("node %s k: %s", nodeName, k)

    return duration.Microseconds(), nil
}

func (n *NetworkSpeedPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    if _, err := n.getProber(nodeInfo.Node().Name); err != nil {
        klog.ErrorS(
            err, "Filter error",
            "node", nodeInfo.Node().Name,
            "pod", pod.Namespace+"/"+pod.Name,
        )
        return framework.NewStatus(framework.Unschedulable)
    }

    annotations := pod.GetAnnotations()
    annotations[nodeInfo.Node().GetName()] = "1" // k
    pod.SetAnnotations(annotations)
    return nil
}

func (n *NetworkSpeedPlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
    // 1. 获取pod group的实例个数
    podAnnotations := pod.GetAnnotations()
    podGroupReplicas, _ := strconv.Atoi(podAnnotations["replicas"])
    // 2. 遍历nodes, 计算node所属leafpod node的个数
    nodeLeafPodCount := make(map[string]int)
    for _, node := range nodes {
        nodeAnnotations := node.GetAnnotations()
        if v, ok := nodeAnnotations["leafpod"]; ok {
            nodeLeafPodCount[v]++
        }
    }
    klog.Infof("nodeLeafPodCount: %v", nodeLeafPodCount)
    // 3. 遍历node, 计算k值
    for _, node := range nodes {
        nodeAnnotations := node.GetAnnotations()
        if v, ok := nodeAnnotations["leafpod"]; ok {
            klog.Infof("node %s leafpod count: %d, k: %d", node.GetName(), nodeLeafPodCount[v], podGroupReplicas-nodeLeafPodCount[v])
            podAnnotations[node.GetName()] = fmt.Sprintf("%d", nodeLeafPodCount[v]-podGroupReplicas)
        }
    }
    // PreScore的时候,判断pod group实例的个数m,nodeInfo所属POD组的node的个数n,k = n -m, k越小则约倾向于调度到该node,k < 0, 则万不得已不要调度到该节点
    // 将node对应k值记录到pod的annotation中
    pod.SetAnnotations(podAnnotations)
    return nil
}

// 归一化分数
func (n *NetworkSpeedPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
    slices.SortFunc(scores, func(a, b framework.NodeScore) int {
        return int(a.Score - b.Score)
    })

    for i, score := range scores {
        if score.Score == 0 {
            continue
        }
        scores[i].Score = int64((len(scores) - i) * (100 / len(scores)))
    }

    klog.InfoS("NormalizeScore", "scores", scores)

    return nil
}

func (n *NetworkSpeedPlugin) ScoreExtensions() framework.ScoreExtensions {
    return n
}

func New(ctx context.Context, configuration runtime.Object, handle framework.Handle) (framework.Plugin, error) {
    var config Config
    if err := frameworkruntime.DecodeInto(configuration, &config); err != nil {
        return nil, err
    }

    if config.Selector == nil {
        return nil, errors.New("prober selector is nil")
    }

    if config.Timeout == 0 {
        config.Timeout = 3000
    }

    if config.Namespace == "" {
        data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
        if err != nil {
            return nil, fmt.Errorf("get namesapce error: %w", err)
        }

        config.Namespace = string(data)
    }

    if config.Port == 0 {
        config.Port = 9115
    }

    klog.InfoS("scheduler config", "config", config)

    return &NetworkSpeedPlugin{
        config: &config,
        handle: handle,
    }, nil
}

启动调度器

package main

import (
    "k8s-network-scheduler/plugins/networkspeed"
    "k8s.io/component-base/cli"
    "k8s.io/kubernetes/cmd/kube-scheduler/app"
    "os"
)

func main() {
    cmd := app.NewSchedulerCommand(
        app.WithPlugin(networkspeed.PluginName, networkspeed.New),
    )

    code := cli.Run(cmd)
    os.Exit(code)
}

三、调度器配置

apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: false
profiles:
  - schedulerName: network-scheduler
    plugins:
      multiPoint:
        enabled:
          - name: NetworkSpeed
    pluginConfig:
      - name: NetworkSpeed
        args:
          selector:
            // 通过标签可以找到对应的插件应用程序
            app.kubernetes.io/instance: network-scheduler
            app.kubernetes.io/name: blackbox-exporter

配置参数介绍(可以配置不同的调度器以及插件权重)

https://kubernetes.io/zh-cn/docs/reference/scheduling/config/#multiple-profiles
https://blog.csdn.net/qq_24433609/article/details/130149592

四、通过extender对默认调度器进行扩展

apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
clientConnection:
  kubeconfig: /etc/kubernetes/scheduler.conf
extenders:
- urlPrefix: http://localhost:8888/
  filterVerb: filter
  ignorable: true
  weight: 1
- urlPrefix: http://localhost:8890/
  filterVerb: filter
  prioritizeVerb: prioritize
  bindVerb: bind
  ignorable: false
  weight: 1

可以看到extender是通过webhook的方式对调度器进行扩展,这种方式最直接的问题是在调度时网络多一跳对性能有影响,对于多语言的状况是个使用场景。extender的方式在k8s 1.15版本推出scheduler framework的方式后即不再推荐,这边不再举例。

scheduler-plugins调研

scheduler-plugins 就是通过scheduler framework扩展的一系列插件,如:coscheduling(pod group),TopologicalSort (app group,会在QueueSort扩展点发挥作用),Noderesourcetopology(可以配置MostAllocated、BalancedAllocation、LeastAllocated、LeastNUMANodes)

部署

scheduler-plugins/manifests/install at master · kubernetes-sigs/scheduler-plugins (github.com)
Installation | Scheduler Plugins (k8s.io)

扩展

编写扩展插件,在启动文件里scheduler-plugins/cmd/scheduler/main.go引入扩展插件,重新编译

package main

import (
    "os"

    "k8s.io/component-base/cli"
    _ "k8s.io/component-base/metrics/prometheus/clientgo" // for rest client metric registration
    _ "k8s.io/component-base/metrics/prometheus/version"  // for version metric registration
    "k8s.io/kubernetes/cmd/kube-scheduler/app"

    "sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
    "sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
    "sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
    "sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
    "sigs.k8s.io/scheduler-plugins/pkg/noderesources"
    "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology"
    "sigs.k8s.io/scheduler-plugins/pkg/podstate"
    "sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration"
    "sigs.k8s.io/scheduler-plugins/pkg/qos"
    "sigs.k8s.io/scheduler-plugins/pkg/sysched"
    "sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
    "sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment"
    "sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"

    // Ensure scheme package is initialized.
    _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
)

func main() {
    // Register custom plugins to the scheduler framework.
    // Later they can consist of scheduler profile(s) and hence
    // used by various kinds of workloads.
    command := app.NewSchedulerCommand(
        app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
        app.WithPlugin(coscheduling.Name, coscheduling.New),
        app.WithPlugin(loadvariationriskbalancing.Name, loadvariationriskbalancing.New),
        app.WithPlugin(networkoverhead.Name, networkoverhead.New),
        app.WithPlugin(topologicalsort.Name, topologicalsort.New),
        app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
        app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
        app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
        app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
        app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
        app.WithPlugin(sysched.Name, sysched.New),
        // Sample plugins below.
        // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
        app.WithPlugin(podstate.Name, podstate.New),
        app.WithPlugin(qos.Name, qos.New),
    )

    code := cli.Run(command)
    os.Exit(code)
}

volcano调研

架构

image.png

可以看出,Volcano由scheduler、controllermanager、admission和vcctl组成:
scheduler通过一系列的action和plugin调度Job,并为它找到一个最适合的节点。与k8s本身的调度器相比,Volcano支持针对Job的多种调度算法。
controllermanager管理CRD资源的生命周期。它主要由Queue ControllerManager、PodGroupControllerManager 、 VCJob ControllerManager构成。
admission负责对CRD API资源进行校验。
vcctl是Volcano的命令行客户端工具。

部署

调度流程

enqueue


image.png

image.png

proportion 插件:Queue相关的的插件
sla插件(enqueue和allocate action)


image.png

allocate
image.png

image.png

image.png

image.png

image.png

rescheduler


image.png

metric
image.png

各调度器对比

功能项 \ 调度器 volcano scheduler-plugins 自定义扩展点 默认调度器
Gang调度 支持 支持 不支持 不支持
装箱调度 支持 支持 不支持 不支持
rescheduler 支持 不支持 不支持 不支持
app dependence 支持(vcjob支持,没有预留queue的扩展点) 支持(通过queueSort实现) 不支持 不支持
节点池 不支持 支持、可以通过多调度器+标签实现 不支持 不支持
自定义调度算法 支持 通过http extender方式对插件进行扩展(多语言友好) 扩展自定义action和plugin 支持 需要重新编译scheduler plugins 多语言支持不太友好(需要自定义接口转发,或者sdk库实现) 支持 不支持
节点拓扑(MostAllocated、BalancedAllocation、LeastAllocated) 支持 支持 不支持 不支持
多调度策略 支持不太好 -- volcano scheduler configmap配置全局调度的plugin -- plugin的参数可以配置在vcjob作业的annotation中单独生效 -- 支持配置muti scheduler -- vcjob中支持配置个性化的plugin(开启ssh免密,svc等) 支持,可以配置多调度器 支持,配置多调度器 不支持
并发调度 支持,可以通过定义Queue对资源大小,优先级限制防止资源冲突 支持,不建议做(在资源紧张情况下可能存在资源冲突问题。配合节点池可以考虑做) 支持,不建议做(在资源紧张情况下可能存在资源冲突问题) 不支持
技术生态 成熟 不太成熟,资料较少 成熟 成熟
云原生 Accepted to CNCF on April 9, 2020. Incubating Projects kubernetes sig小组发布,符合scheduler framework 原生, scheduler framework方式扩展 原生
其他 vcjob支持丰富的调度策略 -- 1. 配置超时时间 -- 2. 设置一级资源、二级资源 -- 3. 支持对接kubeflow、MPI、spark等主流的AI、HPC、大数据计算平台 -- 4. 支持rescheduler 支持多队列资源分配 volcano device plugin支持共享gpu显存 volcano metric
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容