简单总结
device plugin通过kubelet devicemanager的plugin socket注册到devicemanager的plugin server
device manager的plugin server通过注册请求中的信息连接plugin server
连接plugin server后获取device信息(ListAndWatch),通过连接分配资源(Allocate)等
通过plugin server连接获取到资源信息后,通过node status上报到apiserver,调度器此时识别到此资源类型运行使用此资源等pod调度
源码
上报资源信息
pkg/kubelet/kubelet_node_status.go中
设置node status,用于上报
func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) error {
...
setters = append(
...
kl.containerManager.GetDevicePluginResourceCapacity
...),
...
}
devicemanager
pkg/kubelet/cm/devicemanager/manager.go中
构建manager
func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
socket路径为/var/lib/kubelet/device-plugins/kubelet.sock
socketPath := pluginapi.KubeletSocket
...
return newManagerImpl(socketPath, topology, topologyAffinityStore)
}
实际构建manager
func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
...
构建manager对象
manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.Set[string]),
unhealthyDevices: make(map[string]sets.Set[string]),
allocatedDevices: make(map[string]sets.Set[string]),
podDevices: newPodDevices(),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
update: make(chan resourceupdates.Update, 100),
}
构建devicemanager plugin server
server, err := plugin.NewServer(socketPath, manager, manager)
if err != nil {
return nil, fmt.Errorf("failed to create plugin server: %v", err)
}
manager.server = server
...
return manager, nil
}
启动device manager
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.Set[string]) error {
...
启动devicemanager plugin server
return m.server.Start()
}
获取resource的容量
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
...
遍历healthyDevices,添加容量信息
for resourceName, devices := range m.healthyDevices {
...
}
...
遍历unhealthyDevices,添加容量信息
for resourceName, devices := range m.unhealthyDevices {
...
}
返回容量信息
return capacity, allocatable, deletedResources.UnsortedList()
}
分配资源
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
...
分配资源
if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
return err
}
...
}
plugin server连接回调
func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error {
获取plugin server的options
options, err := p.API().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil {
return fmt.Errorf("failed to get device plugin options: %v", err)
}
注册pliugin server endpoint
e := newEndpointImpl(p)
m.mutex.Lock()
defer m.mutex.Unlock()
m.endpoints[resourceName] = endpointInfo{e, options}
return nil
}
list watch plugin server回调
func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
var devices []pluginapi.Device
遍历获取device信息
for _, d := range resp.Devices {
devices = append(devices, *d)
}
更新cache和checkpoint
m.genericDeviceUpdateCallback(resourceName, devices)
}
devicemanager plugin server
pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go中
构建devicemanager plugin server
func NewServer(socketPath string, rh RegistrationHandler, ch ClientHandler) (Server, error) {
...
s := &server{
socketName: name,
socketDir: dir,
rhandler: rh,
chandler: ch,
clients: make(map[string]Client),
}
return s, nil
}
启动devicemanager plugin server
func (s *server) Start() error {
...
go func() {
启动plugin grpc server
if err = s.grpc.Serve(ln); err != nil {
}
}()
return nil
}
注册plugin
func (s *server) Register(ctx context.Context, r *api.RegisterRequest) (*api.Empty, error) {
...
连接plugin server
if err := s.connectClient(r.ResourceName, filepath.Join(s.socketDir, r.Endpoint)); err != nil {
klog.InfoS("Error connecting to device plugin client", "err", err)
return &api.Empty{}, err
}
return &api.Empty{}, nil
}
pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go中
连接plugin server
func (s *server) connectClient(name string, socketPath string) error {
构建plugin server的client
c := NewPluginClient(name, socketPath, s.chandler)
注册plugin server的client
s.registerClient(name, c)
连接plugin server
if err := c.Connect(); err != nil {
...
return err
}
go func() {
运行plugin server的client
s.runClient(name, c)
}()
return nil
}
运行plugin server的client
func (s *server) runClient(name string, c Client) {
运行plugin server的client
c.Run()
...
}
pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go中
构建plugin server的client
func NewPluginClient(r string, socketPath string, h ClientHandler) Client {
return &client{
resource: r,
socket: socketPath,
handler: h,
}
}
连接plugin server
func (c *client) Connect() error {
连接plugin server
client, conn, err := dial(c.socket)
if err != nil {
...
return err
}
通知devicemanager plugin server已连接
return c.handler.PluginConnected(c.resource, c)
}
func (c *client) Run() {
list watch plugin server
stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{})
...
for {
response, err := stream.Recv()
...
通知devicemanager plugin server收到list watch response
c.handler.PluginListAndWatchReceiver(c.resource, response)
}
}