背景
k8s 1.34中MutableCSINodeAllocatableCount特性达到了Beta阶段
解决了以前csidriver只在初始化时配置一个静态的maxvolume,但是节点运行期间此值是可能存在变化的,现在是周期刷新这个值
csidriver中设置,如下
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
name: demo.csi.k8s.io
spec:
nodeAllocatableUpdatePeriodSeconds: 30
源码
pkg/volume/csi/csi_plugin.go中
初始化csiNode
func initializeCSINode(host volume.VolumeHost, csiDriverInformer cache.SharedIndexInformer) error {
...
如果开启了MutableCSINodeAllocatableCount特性
if utilfeature.DefaultFeatureGate.Enabled(features.MutableCSINodeAllocatableCount) && csiNodeUpdaterVar == nil {
...
构建csiNodeUpdater
csiNodeUpdaterVar, err = NewCSINodeUpdater(csiDriverInformer)
...
启动csiNodeUpdater
go csiNodeUpdaterVar.Run()
...
...
}
pkg/volume/csi/csi_node_updater.go中
运行csiNodeUpdater
func (u *csiNodeUpdater) Run() {
u.once.Do(func() {
添加事件处理函数
_, err := u.driverInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: u.onDriverAdd,
UpdateFunc: u.onDriverUpdate,
DeleteFunc: u.onDriverDelete,
})
if err != nil {
klog.ErrorS(err, "Failed to add event handler for CSI driver informer")
return
}
klog.V(4).InfoS("csiNodeUpdater initialized successfully")
})
}
添加driver时的处理函数
func (u *csiNodeUpdater) onDriverAdd(obj interface{}) {
...
同步driverUpdater
u.syncDriverUpdater(driver.Name)
...
}
同步driverUpdater
func (u *csiNodeUpdater) syncDriverUpdater(driverName string) {
...
获取刷新周期
period := getNodeAllocatableUpdatePeriod(driver)
...
运行周期更新处理函数
go u.runPeriodicUpdate(driverName, period, newStopCh)
}
获取获取刷新周期
func getNodeAllocatableUpdatePeriod(driver *v1.CSIDriver) time.Duration {
if driver == nil || driver.Spec.NodeAllocatableUpdatePeriodSeconds == nil {
return 0
}
return time.Duration(*driver.Spec.NodeAllocatableUpdatePeriodSeconds) * time.Second
}
运行周期更新处理函数
func (u *csiNodeUpdater) runPeriodicUpdate(driverName string, period time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(period)
...
case <-ticker.C:
...
更新csidriver
if err := updateCSIDriver(driverName); err != nil {
}
pkg/volume/csi/csi_plugin.go中
func updateCSIDriver(pluginName string) error {
构建csidriver的client
csi, err := newCsiDriverClient(csiDriverName(pluginName))
if err != nil {
return fmt.Errorf("failed to create CSI client for driver %q: %w", pluginName, err)
}
...
调用接口获取nodeinfo
driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
if err != nil {
return fmt.Errorf("failed to get NodeGetInfo from driver %q: %w", pluginName, err)
}
更新csidriver信息
if err := nim.UpdateCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology); err != nil {
return fmt.Errorf("failed to update driver %q: %w", pluginName, err)
}
return nil
}
pkg/volume/csi/nodeinfomanager/nodeinfomanager.go中
更新csidriver
func (nim *nodeInfoManager) UpdateCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
更新csinode
err := nim.updateCSINode(driverName, driverNodeID, maxAttachLimit, topology)
if err != nil {
return fmt.Errorf("error updating CSINode object with CSI driver node info: %w", err)
}
return nil
}
更新csinode
func (nim *nodeInfoManager) updateCSINode(
driverName string,
driverNodeID string,
maxAttachLimit int64,
topology map[string]string) error {
...
尝试更新csinode
if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, maxAttachLimit, topology); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
...
}
长时间更新csinode
func (nim *nodeInfoManager) tryUpdateCSINode(
csiKubeClient clientset.Interface,
driverName string,
driverNodeID string,
maxAttachLimit int64,
topology map[string]string) error {
...
获取csinode
nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
nodeInfo, err = nim.CreateCSINode()
}
if err != nil {
return err
}
安装driver到csinode
return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, maxAttachLimit, topology)
}
安装driver到csinode
func (nim *nodeInfoManager) installDriverToCSINode(
nodeInfo *storagev1.CSINode,
driverName string,
driverNodeID string,
maxAttachLimit int64,
topology map[string]string) error {
...
构建driver spec
driverSpec := storagev1.CSINodeDriver{
Name: driverName,
NodeID: driverNodeID,
TopologyKeys: sets.List[string](topologyKeys),
}
...
设置csidriver的Allocatable
m := int32(maxAttachLimit)
driverSpec.Allocatable = &storagev1.VolumeNodeResources{Count: &m}
...
更新csinode添加csidriver
newDriverSpecs = append(newDriverSpecs, driverSpec)
nodeInfo.Spec.Drivers = newDriverSpecs
更新csinode
_, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
return err
}