背景
k8s 1.36前想要获取节点当前运行的pod信息只能从apiserver获取,这样不仅不安全而且扩展性很差
在k8s 1.36中,新增了PodsAPI特性,开启后可以从kubelet获取当前节点的pod信息
源码
cmd/kubelet/app/server.go中
启动kubelet
func startKubelet(ctx context.Context, k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
...
启动PodsAPI服务
go k.ListenAndServePods(ctx)
}
pkg/kubelet/kubelet.go
启动PodsAPI服务
func (kl *Kubelet) ListenAndServePods(ctx context.Context) {
if utilfeature.DefaultFeatureGate.Enabled(features.PodsAPI) {
默认路径/var/lib/kubelet/pods-api/pods-api.sock
endpoint, err := util.LocalEndpoint(kl.getPodsAPIDir(), pods.Socket)
if err != nil {
klog.FromContext(ctx).Error(err, "Failed to get local endpoint for pod api")
return
}
server.ListenAndServePodsServer(
ctx,
endpoint,
kl.podsServer,
)
}
}
pkg/kubelet/server/server.go
实际启动PodsAPI服务
func ListenAndServePodsServer(ctx context.Context, endpoint string, srv podsv1alpha1.PodsServer) {
...
创建grpc server
server := grpc.NewServer(apisgrpc.WithRateLimiter(ctx, "pods", pods.DefaultQPS, pods.DefaultBurstTokens))
注册grpc server
podsv1alpha1.RegisterPodsServer(server, srv)
创建listener
l, err := util.CreateListener(endpoint)
...
go func() {
运行grpc server
if err := server.Serve(l); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
logger.Error(err, "Failed to serve")
os.Exit(1)
}
}()
等待grpc server结束
<-ctx.Done()
logger.Info("Shutting down pods API server")
server.GracefulStop()
...
}
pkg/kubelet/apis/pods/server.go中
这里以listPods为例,也有WatchPods,GetPOd
获取当前节点pods
func (s *PodsServer) ListPods(ctx context.Context, req *podsv1alpha1.ListPodsRequest) (*podsv1alpha1.ListPodsResponse, error) {
从podmanager获取pods
podsToReturn := s.podManager.GetPods()
按uid排序
sort.Slice(podsToReturn, func(i, j int) bool {
return podsToReturn[i].UID < podsToReturn[j].UID
})
protoPods := make([][]byte, len(podsToReturn))
for i, p := range podsToReturn {
podToMarshal := p
从statusprovider获取pod状态
if podStatus, ok := s.statusProvider.GetPodStatus(p.UID); ok {
podCopy := *p
podCopy.Status = podStatus
podToMarshal = &podCopy
}
序列化pod
podBytes, err := podToMarshal.Marshal()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to marshal pod: %v", err)
}
protoPods[i] = podBytes
}
return &podsv1alpha1.ListPodsResponse{Pods: protoPods}, nil
}