CRD在AI部署中的应用

概述

CRD作为MLOps部署中的必要技术,需要重点学习和研究一下。

CRD定义

没有自定义CRD(类似数据库表结构),因此没有生成自定义的clientSet,使用的dynamic-client:

# virtualmachines-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  # name 必须匹配下面的spec字段:<plural>.<group>  
  name: virtualmachines.cloud.waibizi.com
spec:
   # name 必须匹配下面的spec字段:<plural>.<group>  
  group: cloud.waibizi.com
  # group 名用于 REST API 中的定义:/apis/<group>/<version>  
  versions:
  - name: v1 # 版本名称,比如 v1、v2beta1 等等    
    served: true  # 版本名称,比如 v1、v2beta1 等等    
    storage: true  # 是否开启通过 REST APIs 访问 `/apis/<group>/<version>/...`    
    schema:  # 定义自定义对象的声明规范 
      openAPIV3Schema:
        description: Define virtualMachine YAML Spec
        type: object
        properties:
          # 自定义CRD的字段类型
          spec:
            type: object
            properties:
              uuid:
                type: string
              name:
                type: string
              image:
                type: string
              memory:
                type: integer
              disk:
                type: integer
              status:
                type: string
  # 定义作用范围:Namespaced(命名空间级别)或者 Cluster(整个集群)              
  scope: Namespaced
  names:
    # 定义作用范围:Namespaced(命名空间级别)或者 Cluster(整个集群)
    kind: VirtualMachine
     # plural 名字用于 REST API 中的定义:/apis/<group>/<version>/<plural>    
    plural: virtualmachines
     # singular 名称用于 CLI 操作或显示的一个别名 
    singular: virtualmachines
# 这个地方就是平时使用kubectl get po 当中这个 po 是 pod的缩写的定义,我们可以直接使用kubectl get vm查看
    shortNames:
    - vm

增加一条CR进去(类似数据库记录):

# public-wx.yaml
apiVersion: "cloud.waibizi.com/v1"
kind: VirtualMachine
metadata:
  name: public-wx
spec:
  uuid: "2c4789b2-30f2-4d31-ab71-ca115ea8c199"
  name: "waibizi-wx-virtual-machine"
  image: "Centos-7.9"
  memory: 4096
  disk: 500
  status: "creating"

Operator实战

导入包

mkdir dynamic-project && cd dynamic-project
go mod init dynamic-project
go get k8s.io/client-go@v0.22.1
go get k8s.io/apimachinery@v0.22.1

新建几个文件夹与文件

tree
.
├── go.mod
├── go.sum
├── main.go
└── pkg
    └── apis
        ├── v1
        │   └── type.go
        └── vm_controller.go

3 directories, 5 files

main.go

package main

import (
    "dynamic-project/pkg/apis"
    "flag"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
    "k8s.io/klog"
    "path/filepath"
)

func main() {
    var err error
    var config *rest.Config
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径")
    }
    flag.Parse()
    if config, err = rest.InClusterConfig(); err != nil {
        // 使用 KubeConfig 文件创建集群配置 Config 对象
        if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
            panic(err.Error())
        }
    }
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        return
    }
    dynamicSharedInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, "default", nil)
    // 初始化controller,传入client、informer, 
    controller := apis.NewController(dynamicClient, dynamicSharedInformerFactory)
    // 直接启动controller
    err = controller.Run()
    if err != nil {
        klog.Errorln("fail run controller")
        return
    }
}

type.go

package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime/schema"
)

// VMGVR 定义资源的GVR,以供dynamic client识别资源
var VMGVR = schema.GroupVersionResource{
    Group:    "cloud.waibizi.com",
    Version:  "v1",
    Resource: "virtualmachines",
}

//VirtualMachine 根据 CRD 定义 的 结构体
type VirtualMachine struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    Spec              VMSpec `json:"spec"`
}

type VMSpec struct {
    UUID   string `json:"uuid"`
    Name   string `json:"name"`
    Image  string `json:"image"`
    Memory int    `json:"memory"`
    Disk   int    `json:"disk"`
    Status string `json:"status"`
}

vm_controller.go

package apis

import (
    "context"
    v1 "dynamic-project/pkg/apis/v1"
    "encoding/json"
    "fmt"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/types"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/dynamic"
    "k8s.io/client-go/dynamic/dynamicinformer"
    "k8s.io/client-go/dynamic/dynamiclister"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog"
    "time"
)

var stopCh chan struct{}

const controllerAgentName = "vm-dynamic-controller"

type Controller struct {
    dynamicClient dynamic.Interface
    workqueue     workqueue.RateLimitingInterface
    vmSynced      cache.InformerSynced
    vmInformer    cache.SharedIndexInformer
    vmLister      dynamiclister.Lister
    recorder      record.EventRecorder
}

func NewController(
    dynamicClient dynamic.Interface,
    factory dynamicinformer.DynamicSharedInformerFactory) *Controller {
    informer := factory.ForResource(v1.VMGVR).Informer()
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartStructuredLogging(0)
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
    controller := &Controller{
        workqueue:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "virtualmachines"),
        vmSynced:      informer.HasSynced,
        vmInformer:    informer,
        vmLister:      dynamiclister.New(informer.GetIndexer(), v1.VMGVR),
        recorder:      recorder,
        dynamicClient: dynamicClient,
    }
    // 添加回调事件
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            controller.enqueueFoo(obj)
        },
    })

    return controller
}

func (c *Controller) enqueueFoo(obj interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        utilruntime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}

func (c *Controller) Run() error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()
    go c.vmInformer.Run(stopCh)
    // 启动worker,每个worker一个goroutine
    go wait.Until(c.runWorker, time.Second, stopCh)
    stopCh = make(chan struct{})
    // 等待cache同步
    klog.Info("Waiting for informer caches to sync")
    if ok := cache.WaitForCacheSync(stopCh, c.vmSynced); !ok {
        klog.Errorln("failed to wait for caches to sync")
        return nil
    }
    // 等待退出信号
    <-stopCh
    klog.Infoln("Shutting down workers")
    return nil
}

// worker就是一个循环不断调用processNextWorkItem
func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *Controller) processNextWorkItem() bool {
    // 从工作队列获取对象
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        if key, ok = obj.(string); !ok {
            c.workqueue.Forget(obj)
            utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            // 处理失败再次加入队列
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }
        // 处理成功不入队
        c.workqueue.Forget(obj)
        klog.Infoln("Successfully synced '%s'", key)
        return nil
    }(obj)
    if err != nil {
        utilruntime.HandleError(err)
        return true
    }
    return true
}

func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return err
    }
    unStruct, err := c.vmLister.Namespace(namespace).Get(name)
    newBytes, err := json.Marshal(unStruct)
    if err != nil {
        utilruntime.HandleError(err)
        return err
    }
    vm := &v1.VirtualMachine{}
    if err = json.Unmarshal(newBytes, vm); err != nil {
        utilruntime.HandleError(err)
        return err
    }
    if vm.Spec.Status == "creating" {
        err = c.Creating(vm)
        if err != nil {
            utilruntime.HandleError(err)
            return err
        }
    }
    return nil
}

func (c *Controller) Creating(vm *v1.VirtualMachine) error {
    // 假设虚拟机都可以创建成功,不成功的话就直接更新为fail啥的就行了,或者返回err重新加入队列当中,不断地去进行创建操作
    patchData := []byte(`{"spec": {"status": "complete"}}`)
    // patch更新CR
    _, err := c.dynamicClient.Resource(v1.VMGVR).Namespace("default").Patch(context.Background(),vm.Name,types.MergePatchType,patchData,metav1.PatchOptions{})
    if err != nil {
        klog.Errorln(err)
        return err
    }
    return nil
}


©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容