从k8s源码看Dynamic Resource Allocation

简单总结

dra plugin启动添加socket文件到/var/lib/kubelet/plugins_registry目录下
kubelet plugin watcher发现dra plugin后添加到draPlugins中
pod创建kubelet会指定到syncpod,其中会从draPlugins中获取dra plugin信息
根据plugin信息创建dra plugin client,调用NodePrepareResources接口完成动态资源分配

源码

注册dra plugin

pkg/kubelet/kubelet.go中

创建kubelet
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIPs []net.IP,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    podLogsDirectory string,
    imageCredentialProviderConfigFile string,
    imageCredentialProviderBinDir string,
    registerNode bool,
    registerWithTaints []v1.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    registerSchedulable bool,
    nodeLabels map[string]string,
    nodeStatusMaxImages int32,
    seccompDefault bool,
) (*Kubelet, error) {
    ...
    创建plugin manager
    klet.pluginManager = pluginmanager.NewPluginManager(
        klet.getPluginsRegistrationDir(), /var/lib/kubelet/plugins_registry
        kubeDeps.Recorder,
    )
    ...
}

初始化依赖模块
func (kl *Kubelet) initializeRuntimeDependentModules() {
    ...
    调用container manager的方法获取PluginRegistrationHandler
    for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
        添加PluginRegistrationHandler
        kl.pluginManager.AddHandler(name, handler)
    }
    ...
}

pkg/kubelet/cm/container_manager_linux.go中

获取PluginRegistrationHandler
func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
    ...
    调用device manager的方法获取PluginWatcherHandler
    if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
        res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler()
    }

    return res
}

pkg/kubelet/cm/dra/manager.go中

获取PluginWatcherHandler
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
    return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
}

pkg/kubelet/cm/dra/plugin/registration.go中

构建RegistrationHandler
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
    ...
    构建RegistrationHandler
    handler := &RegistrationHandler{
        // The context and thus logger should come from the caller.
        backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
        kubeClient:    kubeClient,
        getNode:       getNode,
    }
    ...
    return handler
}

注册plugin
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, supportedServices []string, pluginClientTimeout *time.Duration) error {
    ...
    添加plugin信息到draPlugins中,后续动态资源分配会从中获取plugin信息
    if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced {
        ...
    }
    ...
}

pkg/kubelet/pluginmanager/plugin_manager.go中

func NewPluginManager(
    sockDir string,
    recorder record.EventRecorder) PluginManager {
    ...
    reconciler := reconciler.NewReconciler(
        operationexecutor.NewOperationExecutor(
            operationexecutor.NewOperationGenerator(
                recorder,
            ),
        ),
        loopSleepDuration,
        dsw,
        asw,
    )

    pm := &pluginManager{
        desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
            sockDir,
            dsw,
        ),
        reconciler:          reconciler,
        desiredStateOfWorld: dsw,
        actualStateOfWorld:  asw,
    }
    ...
}

启动plugin manager
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    ...
    启动dswp
    if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
        klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
        return
    }
    ...
    启动reconciler
    go pm.reconciler.Run(stopCh)
    ...
}

添加handler
func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {
    调用reconciler的方法添加handler
    pm.reconciler.AddHandler(pluginType, handler)
}

pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go中

构建plugin watcher
func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
    return &Watcher{
        path:                sockDir,
        fs:                  &utilfs.DefaultFs{},
        desiredStateOfWorld: desiredStateOfWorld,
    }
}

启动plugin watcher
func (w *Watcher) Start(stopCh <-chan struct{}) error {
    ...
    处理新增文件和目录
                    err := w.handleCreateEvent(event)
    ...
}

处理新增文件和目录
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
    ...
    处理新增插件
        return w.handlePluginRegistration(event.Name)
    ...
}

处理新增插件
func (w *Watcher) handlePluginRegistration(socketPath string) error {
    ...
    plugin socketPath添加到dsw
    err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
    ...
}

pkg/kubelet/pluginmanager/reconciler/reconciler.go中

构建reconciler
func NewReconciler(
    operationExecutor operationexecutor.OperationExecutor,
    loopSleepDuration time.Duration,
    desiredStateOfWorld cache.DesiredStateOfWorld,
    actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
    return &reconciler{
        operationExecutor:   operationExecutor,
        loopSleepDuration:   loopSleepDuration,
        desiredStateOfWorld: desiredStateOfWorld,
        actualStateOfWorld:  actualStateOfWorld,
        handlers:            make(map[string]cache.PluginHandler),
    }
}

添加reconciler
func (rc *reconciler) Run(stopCh <-chan struct{}) {
    每秒执行一次reconcile
    wait.Until(func() {
        rc.reconcile()
    },
        rc.loopSleepDuration,
        stopCh)
}

添加plugin handler
func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
    rc.Lock()
    defer rc.Unlock()

    rc.handlers[pluginType] = pluginHandler
}

获取plugin handler
func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
    rc.RLock()
    defer rc.RUnlock()

    var copyHandlers = make(map[string]cache.PluginHandler)
    for pluginType, handler := range rc.handlers {
        copyHandlers[pluginType] = handler
    }
    return copyHandlers
}

执行plugin register/deregister
func (rc *reconciler) reconcile() {
    ...
    遍历dsw待register的plugin
    for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
        ...
        注册plugin
        err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
        ...
    }
    ...
}

准备动态资源

pkg/kubelet/kuberuntime/kuberuntime_manager.go中

同步pod到期望的状态
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    ...
    调用kubelet的方法准备动态资源
    if err := m.runtimeHelper.PrepareDynamicResources(ctx, pod); err != nil {
    ...
}

pkg/kubelet/kubelet.go中

准备动态资源
func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
    调度containermanager的方法准备动态资源
    return kl.containerManager.PrepareDynamicResources(ctx, pod)
}

pkg/kubelet/cm/container_manager_linux.go中

准备动态资源
func (cm *containerManagerImpl) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
    调用dra manager的方法准备动态资源
    return cm.draManager.PrepareResources(ctx, pod)
}

pkg/kubelet/cm/dra/manager.go中

准备动态资源
func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
    准备动态资源
    err := m.prepareResources(ctx, pod)
    return err
}

func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
    ...
    for driverName, claims := range batches {
        构建dra plugin client
        client, err := dra.NewDRAPluginClient(driverName)
        ...
        通过dra plugin client调用plugin server的NodePrepareResources方法,准备动态资源
        response, err := client.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{Claims: claims})

    }
    ...
}

pkg/kubelet/cm/dra/plugin/plugin.go中

构建dra plugin client
func NewDRAPluginClient(pluginName string) (*Plugin, error) {
    ...
    获取dra plugin信息
    existingPlugin := draPlugins.get(pluginName)
    ...
    return existingPlugin, nil
}

准备动态资源
func (p *Plugin) NodePrepareResources(
    ctx context.Context,
    req *drapbv1beta1.NodePrepareResourcesRequest,
    opts ...grpc.CallOption,
) (*drapbv1beta1.NodePrepareResourcesResponse, error) {
    ...
    获取dra plugin grpc conn
    conn, err := p.getOrCreateGRPCConn()
    if err != nil {
        return nil, err
    }
    ...
    调用dra plugin server的NodePrepareResources方法,准备动态资源
        nodeClient := drapbv1beta1.NewDRAPluginClient(conn)
        response, err = nodeClient.NodePrepareResources(ctx, req)
    ...
}

获取dra plugin grpc conn
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
    ...
    和dra plugin server建立连接
    conn, err := grpc.Dial(
        p.endpoint,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
            return (&net.Dialer{}).DialContext(ctx, network, target)
        }),
        grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)),
    )
    ...
}

创建

pkg/controller/resourceclaim/controller.go中

resourceclaim 控制器
func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error {
    ...
    根据resourceclaim template创建resourceclaim对象
        claim = &resourceapi.ResourceClaim{
            ObjectMeta: metav1.ObjectMeta{
                GenerateName: generateName,
                OwnerReferences: []metav1.OwnerReference{
                    {
                        APIVersion:         "v1",
                        Kind:               "Pod",
                        Name:               pod.Name,
                        UID:                pod.UID,
                        Controller:         &isTrue,
                        BlockOwnerDeletion: &isTrue,
                    },
                },
                Annotations: annotations,
                Labels:      template.Spec.ObjectMeta.Labels,
            },
            Spec: template.Spec.Spec,
        }
    ...
    创建resourceclaim
        claim, err = ec.kubeClient.ResourceV1beta1().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{})
    ...
}

调度

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go中

预过滤节点
func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   ...
   构建dra allocator
        allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
        if err != nil {
            return nil, statusError(logger, err)
        }
        s.allocator = allocator
   ...
}


过滤节点
func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    ...
    分配资源
    a, err := state.allocator.Allocate(allocCtx, node)
    ...
}

预留资源
func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
    ...
    更新缓存,resourceclaim即将在Bind节点进行分配
    err := pl.draManager.ResourceClaims().SignalClaimPendingAllocation(claim.UID, claim)
    ...
}

预绑定节点
func (pl *DynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
    ,,,
    更新resourceclaim状态,如ReservedFor,Allocation等
    claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
    ,,,
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容