从源码看MutableCSINodeAllocatableCount特性

背景

k8s 1.34中MutableCSINodeAllocatableCount特性达到了Beta阶段
解决了以前csidriver只在初始化时配置一个静态的maxvolume,但是节点运行期间此值是可能存在变化的,现在是周期刷新这个值

csidriver中设置,如下

apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  name: demo.csi.k8s.io
spec:
  nodeAllocatableUpdatePeriodSeconds: 30

源码

pkg/volume/csi/csi_plugin.go中

初始化csiNode
func initializeCSINode(host volume.VolumeHost, csiDriverInformer cache.SharedIndexInformer) error {
    ...
    如果开启了MutableCSINodeAllocatableCount特性
    if utilfeature.DefaultFeatureGate.Enabled(features.MutableCSINodeAllocatableCount) && csiNodeUpdaterVar == nil {
        ...
            构建csiNodeUpdater
            csiNodeUpdaterVar, err = NewCSINodeUpdater(csiDriverInformer)
            ...
            启动csiNodeUpdater
                go csiNodeUpdaterVar.Run()
        ...
    ...
}

pkg/volume/csi/csi_node_updater.go中

运行csiNodeUpdater
func (u *csiNodeUpdater) Run() {
    u.once.Do(func() {
        添加事件处理函数
        _, err := u.driverInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    u.onDriverAdd,
            UpdateFunc: u.onDriverUpdate,
            DeleteFunc: u.onDriverDelete,
        })
        if err != nil {
            klog.ErrorS(err, "Failed to add event handler for CSI driver informer")
            return
        }
        klog.V(4).InfoS("csiNodeUpdater initialized successfully")
    })
}


添加driver时的处理函数
func (u *csiNodeUpdater) onDriverAdd(obj interface{}) {
    ...
    同步driverUpdater
    u.syncDriverUpdater(driver.Name)
    ...
}

同步driverUpdater
func (u *csiNodeUpdater) syncDriverUpdater(driverName string) {
    ...
    获取刷新周期
    period := getNodeAllocatableUpdatePeriod(driver)
    ...
    运行周期更新处理函数
    go u.runPeriodicUpdate(driverName, period, newStopCh)
}


获取获取刷新周期
func getNodeAllocatableUpdatePeriod(driver *v1.CSIDriver) time.Duration {
    if driver == nil || driver.Spec.NodeAllocatableUpdatePeriodSeconds == nil {
        return 0
    }
    return time.Duration(*driver.Spec.NodeAllocatableUpdatePeriodSeconds) * time.Second
}



运行周期更新处理函数
func (u *csiNodeUpdater) runPeriodicUpdate(driverName string, period time.Duration, stopCh <-chan struct{}) {
    ticker := time.NewTicker(period)
    ...
        case <-ticker.C:
        ...
        更新csidriver
            if err := updateCSIDriver(driverName); err != nil {

}


pkg/volume/csi/csi_plugin.go中

func updateCSIDriver(pluginName string) error {
    构建csidriver的client
    csi, err := newCsiDriverClient(csiDriverName(pluginName))
    if err != nil {
        return fmt.Errorf("failed to create CSI client for driver %q: %w", pluginName, err)
    }

    ...
    调用接口获取nodeinfo
    driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
    if err != nil {
        return fmt.Errorf("failed to get NodeGetInfo from driver %q: %w", pluginName, err)
    }
    更新csidriver信息
    if err := nim.UpdateCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology); err != nil {
        return fmt.Errorf("failed to update driver %q: %w", pluginName, err)
    }
    return nil
}

pkg/volume/csi/nodeinfomanager/nodeinfomanager.go中

更新csidriver
func (nim *nodeInfoManager) UpdateCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
    更新csinode
    err := nim.updateCSINode(driverName, driverNodeID, maxAttachLimit, topology)
    if err != nil {
        return fmt.Errorf("error updating CSINode object with CSI driver node info: %w", err)
    }
    return nil
}

更新csinode
func (nim *nodeInfoManager) updateCSINode(
    driverName string,
    driverNodeID string,
    maxAttachLimit int64,
    topology map[string]string) error {
    ...
    尝试更新csinode
        if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, maxAttachLimit, topology); err != nil {
            updateErrs = append(updateErrs, err)
            return false, nil
        }
    ...
}

长时间更新csinode
func (nim *nodeInfoManager) tryUpdateCSINode(
    csiKubeClient clientset.Interface,
    driverName string,
    driverNodeID string,
    maxAttachLimit int64,
    topology map[string]string) error {
    ...
    获取csinode
    nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
    if nodeInfo == nil || errors.IsNotFound(err) {
        nodeInfo, err = nim.CreateCSINode()
    }
    if err != nil {
        return err
    }
    安装driver到csinode
    return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, maxAttachLimit, topology)
}

安装driver到csinode
func (nim *nodeInfoManager) installDriverToCSINode(
    nodeInfo *storagev1.CSINode,
    driverName string,
    driverNodeID string,
    maxAttachLimit int64,
    topology map[string]string) error {
    ...
    构建driver spec
    driverSpec := storagev1.CSINodeDriver{
        Name:         driverName,
        NodeID:       driverNodeID,
        TopologyKeys: sets.List[string](topologyKeys),
    }
    ...
    设置csidriver的Allocatable
        m := int32(maxAttachLimit)
        driverSpec.Allocatable = &storagev1.VolumeNodeResources{Count: &m}
    ...
    更新csinode添加csidriver
    newDriverSpecs = append(newDriverSpecs, driverSpec)
    nodeInfo.Spec.Drivers = newDriverSpecs
    更新csinode
    _, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
    return err
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容