简单总结
dra plugin启动添加socket文件到/var/lib/kubelet/plugins_registry目录下
kubelet plugin watcher发现dra plugin后添加到draPlugins中
pod创建kubelet会指定到syncpod,其中会从draPlugins中获取dra plugin信息
根据plugin信息创建dra plugin client,调用NodePrepareResources接口完成动态资源分配
源码
注册dra plugin
pkg/kubelet/kubelet.go中
创建kubelet
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
podLogsDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []v1.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
registerSchedulable bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
...
创建plugin manager
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /var/lib/kubelet/plugins_registry
kubeDeps.Recorder,
)
...
}
初始化依赖模块
func (kl *Kubelet) initializeRuntimeDependentModules() {
...
调用container manager的方法获取PluginRegistrationHandler
for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
添加PluginRegistrationHandler
kl.pluginManager.AddHandler(name, handler)
}
...
}
pkg/kubelet/cm/container_manager_linux.go中
获取PluginRegistrationHandler
func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
...
调用device manager的方法获取PluginWatcherHandler
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler()
}
return res
}
pkg/kubelet/cm/dra/manager.go中
获取PluginWatcherHandler
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
}
pkg/kubelet/cm/dra/plugin/registration.go中
构建RegistrationHandler
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
...
构建RegistrationHandler
handler := &RegistrationHandler{
// The context and thus logger should come from the caller.
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
kubeClient: kubeClient,
getNode: getNode,
}
...
return handler
}
注册plugin
func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, supportedServices []string, pluginClientTimeout *time.Duration) error {
...
添加plugin信息到draPlugins中,后续动态资源分配会从中获取plugin信息
if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced {
...
}
...
}
pkg/kubelet/pluginmanager/plugin_manager.go中
func NewPluginManager(
sockDir string,
recorder record.EventRecorder) PluginManager {
...
reconciler := reconciler.NewReconciler(
operationexecutor.NewOperationExecutor(
operationexecutor.NewOperationGenerator(
recorder,
),
),
loopSleepDuration,
dsw,
asw,
)
pm := &pluginManager{
desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
sockDir,
dsw,
),
reconciler: reconciler,
desiredStateOfWorld: dsw,
actualStateOfWorld: asw,
}
...
}
启动plugin manager
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
...
启动dswp
if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
return
}
...
启动reconciler
go pm.reconciler.Run(stopCh)
...
}
添加handler
func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {
调用reconciler的方法添加handler
pm.reconciler.AddHandler(pluginType, handler)
}
pkg/kubelet/pluginmanager/pluginwatcher/plugin_watcher.go中
构建plugin watcher
func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
return &Watcher{
path: sockDir,
fs: &utilfs.DefaultFs{},
desiredStateOfWorld: desiredStateOfWorld,
}
}
启动plugin watcher
func (w *Watcher) Start(stopCh <-chan struct{}) error {
...
处理新增文件和目录
err := w.handleCreateEvent(event)
...
}
处理新增文件和目录
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
...
处理新增插件
return w.handlePluginRegistration(event.Name)
...
}
处理新增插件
func (w *Watcher) handlePluginRegistration(socketPath string) error {
...
plugin socketPath添加到dsw
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
...
}
pkg/kubelet/pluginmanager/reconciler/reconciler.go中
构建reconciler
func NewReconciler(
operationExecutor operationexecutor.OperationExecutor,
loopSleepDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
return &reconciler{
operationExecutor: operationExecutor,
loopSleepDuration: loopSleepDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
handlers: make(map[string]cache.PluginHandler),
}
}
添加reconciler
func (rc *reconciler) Run(stopCh <-chan struct{}) {
每秒执行一次reconcile
wait.Until(func() {
rc.reconcile()
},
rc.loopSleepDuration,
stopCh)
}
添加plugin handler
func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
rc.Lock()
defer rc.Unlock()
rc.handlers[pluginType] = pluginHandler
}
获取plugin handler
func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
rc.RLock()
defer rc.RUnlock()
var copyHandlers = make(map[string]cache.PluginHandler)
for pluginType, handler := range rc.handlers {
copyHandlers[pluginType] = handler
}
return copyHandlers
}
执行plugin register/deregister
func (rc *reconciler) reconcile() {
...
遍历dsw待register的plugin
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
...
注册plugin
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
...
}
...
}
准备动态资源
pkg/kubelet/kuberuntime/kuberuntime_manager.go中
同步pod到期望的状态
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
...
调用kubelet的方法准备动态资源
if err := m.runtimeHelper.PrepareDynamicResources(ctx, pod); err != nil {
...
}
pkg/kubelet/kubelet.go中
准备动态资源
func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
调度containermanager的方法准备动态资源
return kl.containerManager.PrepareDynamicResources(ctx, pod)
}
pkg/kubelet/cm/container_manager_linux.go中
准备动态资源
func (cm *containerManagerImpl) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
调用dra manager的方法准备动态资源
return cm.draManager.PrepareResources(ctx, pod)
}
pkg/kubelet/cm/dra/manager.go中
准备动态资源
func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
准备动态资源
err := m.prepareResources(ctx, pod)
return err
}
func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
...
for driverName, claims := range batches {
构建dra plugin client
client, err := dra.NewDRAPluginClient(driverName)
...
通过dra plugin client调用plugin server的NodePrepareResources方法,准备动态资源
response, err := client.NodePrepareResources(ctx, &drapb.NodePrepareResourcesRequest{Claims: claims})
}
...
}
pkg/kubelet/cm/dra/plugin/plugin.go中
构建dra plugin client
func NewDRAPluginClient(pluginName string) (*Plugin, error) {
...
获取dra plugin信息
existingPlugin := draPlugins.get(pluginName)
...
return existingPlugin, nil
}
准备动态资源
func (p *Plugin) NodePrepareResources(
ctx context.Context,
req *drapbv1beta1.NodePrepareResourcesRequest,
opts ...grpc.CallOption,
) (*drapbv1beta1.NodePrepareResourcesResponse, error) {
...
获取dra plugin grpc conn
conn, err := p.getOrCreateGRPCConn()
if err != nil {
return nil, err
}
...
调用dra plugin server的NodePrepareResources方法,准备动态资源
nodeClient := drapbv1beta1.NewDRAPluginClient(conn)
response, err = nodeClient.NodePrepareResources(ctx, req)
...
}
获取dra plugin grpc conn
func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
...
和dra plugin server建立连接
conn, err := grpc.Dial(
p.endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)),
)
...
}
创建
pkg/controller/resourceclaim/controller.go中
resourceclaim 控制器
func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error {
...
根据resourceclaim template创建resourceclaim对象
claim = &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
GenerateName: generateName,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &isTrue,
BlockOwnerDeletion: &isTrue,
},
},
Annotations: annotations,
Labels: template.Spec.ObjectMeta.Labels,
},
Spec: template.Spec.Spec,
}
...
创建resourceclaim
claim, err = ec.kubeClient.ResourceV1beta1().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{})
...
}
调度
pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go中
预过滤节点
func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
...
构建dra allocator
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
if err != nil {
return nil, statusError(logger, err)
}
s.allocator = allocator
...
}
过滤节点
func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
...
分配资源
a, err := state.allocator.Allocate(allocCtx, node)
...
}
预留资源
func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
...
更新缓存,resourceclaim即将在Bind节点进行分配
err := pl.draManager.ResourceClaims().SignalClaimPendingAllocation(claim.UID, claim)
...
}
预绑定节点
func (pl *DynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
,,,
更新resourceclaim状态,如ReservedFor,Allocation等
claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
,,,
}