从源码看csi driver如何注册

kubelet

入口代码

pkg/kubelet/kubelet.go

创建kubelet
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIPs []net.IP,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    imageCredentialProviderConfigFile string,
    imageCredentialProviderBinDir string,
    registerNode bool,
    registerWithTaints []v1.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    registerSchedulable bool,
    keepTerminatedPodVolumes bool,
    nodeLabels map[string]string,
    nodeStatusMaxImages int32,
    seccompDefault bool,
) (*Kubelet, error) {
    ...
    创建pluginManager
    klet.pluginManager = pluginmanager.NewPluginManager(
        klet.getPluginsRegistrationDir(), /* sockDir */
        kubeDeps.Recorder,
    )
    ...
}

pkg/kubelet/kubelet_getters.go

默认/var/lib/kubelet
func (kl *Kubelet) getRootDir() string {
    return kl.rootDirectory
}

默认/var/lib/kubelet/plugins_registry
func (kl *Kubelet) getPluginsRegistrationDir() string {
    return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsRegistrationDirName)
}

pkg/kubelet/kubelet.go中

启动kubelet
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ...
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
    ...
}

func (kl *Kubelet) updateRuntimeUp() {
...
    添加csi plugin handler
    kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
...
    初始化runtime依赖
    kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
...
}

func (kl *Kubelet) initializeRuntimeDependentModules() {
...
启动pluginmanager
    go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
...
}

pkg/kubelet/pluginmanager/plugin_manager.go中

创建poluginManager
func NewPluginManager(
    sockDir string,
    recorder record.EventRecorder) PluginManager {
    asw := cache.NewActualStateOfWorld()
    dsw := cache.NewDesiredStateOfWorld()
    reconciler := reconciler.NewReconciler(
        operationexecutor.NewOperationExecutor(
            operationexecutor.NewOperationGenerator(
                recorder,
            ),
        ),
        loopSleepDuration,
        dsw,
        asw,
    )

    pm := &pluginManager{
        desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
            sockDir,
            dsw,
        ),
        reconciler:          reconciler,
        desiredStateOfWorld: dsw,
        actualStateOfWorld:  asw,
    }
    return pm
}

启动pluginmanager
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
...
启动watcher
    if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
        klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
        return
    }
...
启动reconciler
    go pm.reconciler.Run(stopCh)
...
}

csi plugin处理方法

pkg/volume/csi/csi_plugin.go中

var PluginHandler = &RegistrationHandler{}

func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
...
    csiDrivers.Set(pluginName, Driver{
        endpoint:                endpoint,
        highestSupportedVersion: highestSupportedVersion,
    })
...
}

发现plugin

pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go中

创建watcher
func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
    return &Watcher{
        path:                sockDir,
        fs:                  &utilfs.DefaultFs{},
        desiredStateOfWorld: desiredStateOfWorld,
    }
}

启动watch
func (w *Watcher) Start(stopCh <-chan struct{}) error {
...
    fsWatcher, err := fsnotify.NewWatcher()
    if err != nil {
        return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
    }
    w.fsWatcher = fsWatcher
    遍历plugin目录
    if err := w.traversePluginDir(w.path); err != nil {
        klog.ErrorS(err, "Failed to traverse plugin socket path", "path", w.path)
    }
    监听plugin目录事件
    go func(fsWatcher *fsnotify.Watcher) {
        for {
            select {
            case event := <-fsWatcher.Events:
                ...
                有新增事件
                    err := w.handleCreateEvent(event)
                ...
            }
        }
    }(fsWatcher)
...
}

func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
...
处理plugin注册
        return w.handlePluginRegistration(event.Name)
...
}

func (w *Watcher) handlePluginRegistration(socketPath string) error {
    ...
    添加plugin到dsw
    err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
    ...
}

注册额plugin

pkg/kubelet/pluginmanager/reconciler/reconciler.go中

func NewReconciler(
    operationExecutor operationexecutor.OperationExecutor,
    loopSleepDuration time.Duration,
    desiredStateOfWorld cache.DesiredStateOfWorld,
    actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
    return &reconciler{
        operationExecutor:   operationExecutor,
        loopSleepDuration:   loopSleepDuration,
        desiredStateOfWorld: desiredStateOfWorld,
        actualStateOfWorld:  actualStateOfWorld,
        handlers:            make(map[string]cache.PluginHandler),
    }
}

func (rc *reconciler) Run(stopCh <-chan struct{}) {
    wait.Until(func() {
        rc.reconcile()
    },
        rc.loopSleepDuration,
        stopCh)
}

添加handler
func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
    rc.Lock()
    defer rc.Unlock()

    rc.handlers[pluginType] = pluginHandler
}

获取handler
func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
    rc.RLock()
    defer rc.RUnlock()

    var copyHandlers = make(map[string]cache.PluginHandler)
    for pluginType, handler := range rc.handlers {
        copyHandlers[pluginType] = handler
    }
    return copyHandlers
}


func (rc *reconciler) reconcile() {
...
获取期望注解的plugin
    for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
...
注册plugin
            err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
...
    }
...
}

pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go中

创建operationExecutor
func NewOperationExecutor(
    operationGenerator OperationGenerator) OperationExecutor {

    return &operationExecutor{
        pendingOperations:  goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
        operationGenerator: operationGenerator,
    }
}

注册plugin
func (oe *operationExecutor) RegisterPlugin(
    socketPath string,
    timestamp time.Time,
    pluginHandlers map[string]cache.PluginHandler,
    actualStateOfWorld ActualStateOfWorldUpdater) error {
    生成注册plugin方法
    generatedOperation :=
        oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
    执行方法
    return oe.pendingOperations.Run(
        socketPath, generatedOperation)
}

pkg/util/goroutinemap/goroutinemap.go中

创建GoRoutineMap
func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
    g := &goRoutineMap{
        operations:                make(map[string]operation),
        exponentialBackOffOnError: exponentialBackOffOnError,
    }

    g.cond = sync.NewCond(&g.lock)
    return g
}

执行operationFunc
func (grm *goRoutineMap) Run(
    operationName string,
    operationFunc func() error) error {
    grm.lock.Lock()
    defer grm.lock.Unlock()

    existingOp, exists := grm.operations[operationName]
    if exists {
        // Operation with name exists
        if existingOp.operationPending {
            return NewAlreadyExistsError(operationName)
        }

        if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
            return err
        }
    }

    grm.operations[operationName] = operation{
        operationPending: true,
        expBackoff:       existingOp.expBackoff,
    }
    go func() (err error) {
        // Handle unhandled panics (very unlikely)
        defer k8sRuntime.HandleCrash()
        // Handle completion of and error, if any, from operationFunc()
        defer grm.operationComplete(operationName, &err)
        // Handle panic, if any, from operationFunc()
        defer k8sRuntime.RecoverFromPanic(&err)
        return operationFunc()
    }()

    return nil
}

pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go中

创建operationGenerator
func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {

    return &operationGenerator{
        recorder: recorder,
    }
}


生成注册操作
func (og *operationGenerator) GenerateRegisterPluginFunc(
    socketPath string,
    timestamp time.Time,
    pluginHandlers map[string]cache.PluginHandler,
    actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

    registerPluginFunc := func() error {
...

        handler, ok := pluginHandlers[infoResp.Type]
...

        if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
            return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
        }
...
        return nil
    }
    return registerPluginFunc
}

csi-node-driver-registrer

cmd/csi-node-driver-registrar/main.go中

func main() {
    ...
    节点注册
    nodeRegister(csiDriverName, addr)
    ...
}
func nodeRegister(csiDriverName, httpEndpoint string) {
    ...
    构建socketPath
    socketPath := buildSocketPath(csiDriverName)
    ...
    监听socketPath
    lis, err := net.Listen("unix", socketPath)
    if err != nil {
        klog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
        os.Exit(1)
    }
    ...
    创建grpc server
    grpcServer := grpc.NewServer()
    registerapi.RegisterRegistrationServer(grpcServer, registrar)

    启动http server
    go httpServer(socketPath, httpEndpoint, csiDriverName)
    退出清楚socketPath
    go removeRegSocket(csiDriverName)
    ...
    启动grpc server
    if err := grpcServer.Serve(lis); err != nil {
        klog.Errorf("Registration Server stopped serving: %v", err)
        os.Exit(1)
    }
    ...
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容