kubeadm创建控制面静态pod manifest文件

静态pod manifest文件存放目录默认为/etc/kubernetes/manifests

cmd/kubeadm/app/constants/constants.go

const (
    // KubernetesDir is the directory Kubernetes owns for storing various configuration files
    KubernetesDir = "/etc/kubernetes"
    // ManifestsSubDirName defines directory name to store manifests
    ManifestsSubDirName = "manifests"
    ...
)

创建 kube-apiserver kube-controller-manager kube-scheduler的manifest文件

cmd/kubeadm/app/phases/controlplane/manifests.go

// CreateInitStaticPodManifestFiles will write all static pod manifest files needed to bring up the control plane.
func CreateInitStaticPodManifestFiles(manifestDir, patchesDir string, cfg *kubeadmapi.InitConfiguration) error {
    klog.V(1).Infoln("[control-plane] creating static Pod files")
    return CreateStaticPodFiles(manifestDir, patchesDir, &cfg.ClusterConfiguration, &cfg.LocalAPIEndpoint, kubeadmconstants.KubeAPIServer, kubeadmconstants.KubeControllerManager, kubeadmconstants.KubeScheduler)
}


// CreateStaticPodFiles creates all the requested static pod files.
func CreateStaticPodFiles(manifestDir, patchesDir string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, componentNames ...string) error {
    // gets the StaticPodSpecs, actualized for the current ClusterConfiguration
    klog.V(1).Infoln("[control-plane] getting StaticPodSpecs")
    specs := GetStaticPodSpecs(cfg, endpoint)

    // creates required static pod specs
    for _, componentName := range componentNames {
        // retrieves the StaticPodSpec for given component
        spec, exists := specs[componentName]
        if !exists {
            return errors.Errorf("couldn't retrieve StaticPodSpec for %q", componentName)
        }

        // print all volumes that are mounted
        for _, v := range spec.Spec.Volumes {
            klog.V(2).Infof("[control-plane] adding volume %q for component %q", v.Name, componentName)
        }

        // if patchesDir is defined, patch the static Pod manifest
        if patchesDir != "" {
            patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
            if err != nil {
                return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", componentName)
            }
            spec = *patchedSpec
        }

        // writes the StaticPodSpec to disk,如果manifestDir文件夹不存在,则创建文件夹且权限为0700
        if err := staticpodutil.WriteStaticPodToDisk(componentName, manifestDir, spec); err != nil {
            return errors.Wrapf(err, "failed to create static pod manifest file for %q", componentName)
        }

        klog.V(1).Infof("[control-plane] wrote static Pod manifest for component %q to %q\n", componentName, kubeadmconstants.GetStaticPodFilepath(componentName, manifestDir))
    }

    return nil
}


// GetStaticPodSpecs returns all staticPodSpecs actualized to the context of the current configuration
// NB. this methods holds the information about how kubeadm creates static pod manifests.
func GetStaticPodSpecs(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint) map[string]v1.Pod {
    // Get the required hostpath mounts
    mounts := getHostPathVolumesForTheControlPlane(cfg)

    // Prepare static pod specs
    staticPodSpecs := map[string]v1.Pod{
        kubeadmconstants.KubeAPIServer: staticpodutil.ComponentPod(v1.Container{
            Name:            kubeadmconstants.KubeAPIServer,
            Image:           images.GetKubernetesImage(kubeadmconstants.KubeAPIServer, cfg),
            ImagePullPolicy: v1.PullIfNotPresent,
            Command:         getAPIServerCommand(cfg, endpoint),
            VolumeMounts:    staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeAPIServer)),
            LivenessProbe:   staticpodutil.LivenessProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/livez", int(endpoint.BindPort), v1.URISchemeHTTPS),
            ReadinessProbe:  staticpodutil.ReadinessProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/readyz", int(endpoint.BindPort), v1.URISchemeHTTPS),
            StartupProbe:    staticpodutil.StartupProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/livez", int(endpoint.BindPort), v1.URISchemeHTTPS, cfg.APIServer.TimeoutForControlPlane),
            Resources:       staticpodutil.ComponentResources("250m"),
            Env:             kubeadmutil.GetProxyEnvVars(),
        }, mounts.GetVolumes(kubeadmconstants.KubeAPIServer),
            map[string]string{kubeadmconstants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey: endpoint.String()}),
        kubeadmconstants.KubeControllerManager: staticpodutil.ComponentPod(v1.Container{
            Name:            kubeadmconstants.KubeControllerManager,
            Image:           images.GetKubernetesImage(kubeadmconstants.KubeControllerManager, cfg),
            ImagePullPolicy: v1.PullIfNotPresent,
            Command:         getControllerManagerCommand(cfg),
            VolumeMounts:    staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeControllerManager)),
            LivenessProbe:   staticpodutil.LivenessProbe(staticpodutil.GetControllerManagerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeControllerManagerPort, v1.URISchemeHTTPS),
            StartupProbe:    staticpodutil.StartupProbe(staticpodutil.GetControllerManagerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeControllerManagerPort, v1.URISchemeHTTPS, cfg.APIServer.TimeoutForControlPlane),
            Resources:       staticpodutil.ComponentResources("200m"),
            Env:             kubeadmutil.GetProxyEnvVars(),
        }, mounts.GetVolumes(kubeadmconstants.KubeControllerManager), nil),
        kubeadmconstants.KubeScheduler: staticpodutil.ComponentPod(v1.Container{
            Name:            kubeadmconstants.KubeScheduler,
            Image:           images.GetKubernetesImage(kubeadmconstants.KubeScheduler, cfg),
            ImagePullPolicy: v1.PullIfNotPresent,
            Command:         getSchedulerCommand(cfg),
            VolumeMounts:    staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeScheduler)),
            LivenessProbe:   staticpodutil.LivenessProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS),
            StartupProbe:    staticpodutil.StartupProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS, cfg.APIServer.TimeoutForControlPlane),
            Resources:       staticpodutil.ComponentResources("100m"),
            Env:             kubeadmutil.GetProxyEnvVars(),
        }, mounts.GetVolumes(kubeadmconstants.KubeScheduler), nil),
    }
    return staticPodSpecs
}


创建etcd manifest文件

cmd/kubeadm/app/phases/etcd/local.go

const (
    etcdVolumeName           = "etcd-data"
    certsVolumeName          = "etcd-certs"
    etcdHealthyCheckInterval = 5 * time.Second
    etcdHealthyCheckRetries  = 8
)

// CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file.
// This function is used by init - when the etcd cluster is empty - or by kubeadm
// upgrade - when the etcd cluster is already up and running (and the --initial-cluster flag have no impact)
func CreateLocalEtcdStaticPodManifestFile(manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint) error {
    if cfg.Etcd.External != nil {
        return errors.New("etcd static pod manifest cannot be generated for cluster using external etcd")
    }
    // gets etcd StaticPodSpec
    spec := GetEtcdPodSpec(cfg, endpoint, nodeName, []etcdutil.Member{})

    // if patchesDir is defined, patch the static Pod manifest
    if patchesDir != "" {
        patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
        if err != nil {
            return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", kubeadmconstants.Etcd)
        }
        spec = *patchedSpec
    }

    // writes etcd StaticPod to disk
    if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
        return err
    }

    klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
    return nil
}

// CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member
func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.ClusterConfiguration) error {
    klog.V(1).Info("[etcd] Checking etcd cluster health")

    // creates an etcd client that connects to all the local/stacked etcd members
    klog.V(1).Info("creating etcd client that connects to etcd pods")
    etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
    if err != nil {
        return err
    }

    // Checking health state
    err = etcdClient.CheckClusterHealth()
    if err != nil {
        return errors.Wrap(err, "etcd cluster is not healthy")
    }

    return nil
}

// RemoveStackedEtcdMemberFromCluster will remove a local etcd member from etcd cluster,
// when reset the control plane node.
func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
    // creates an etcd client that connects to all the local/stacked etcd members
    klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods")
    etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
    if err != nil {
        return err
    }

    members, err := etcdClient.ListMembers()
    if err != nil {
        return err
    }
    // If this is the only remaining stacked etcd member in the cluster, calling RemoveMember()
    // is not needed.
    if len(members) == 1 {
        etcdClientAddress := etcdutil.GetClientURL(&cfg.LocalAPIEndpoint)
        for _, endpoint := range etcdClient.Endpoints {
            if endpoint == etcdClientAddress {
                klog.V(1).Info("[etcd] This is the only remaining etcd member in the etcd cluster, skip removing it")
                return nil
            }
        }
    }

    // notifies the other members of the etcd cluster about the removing member
    etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint)

    klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
    id, err := etcdClient.GetMemberID(etcdPeerAddress)
    if err != nil {
        return err
    }

    klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id)
    members, err = etcdClient.RemoveMember(id)
    if err != nil {
        return err
    }
    klog.V(1).Infof("[etcd] Updated etcd member list: %v", members)

    return nil
}

// CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file
// for an additional etcd member that is joining an existing local/stacked etcd cluster.
// Other members of the etcd cluster will be notified of the joining node in beforehand as well.
func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint) error {
    // creates an etcd client that connects to all the local/stacked etcd members
    klog.V(1).Info("creating etcd client that connects to etcd pods")
    etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
    if err != nil {
        return err
    }

    etcdPeerAddress := etcdutil.GetPeerURL(endpoint)

    klog.V(1).Infoln("[etcd] Getting the list of existing members")
    initialCluster, err := etcdClient.ListMembers()
    if err != nil {
        return err
    }

    // only add the new member if it doesn't already exists
    var exists bool
    klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress)
    for i := range initialCluster {
        if initialCluster[i].PeerURL == etcdPeerAddress {
            exists = true
            if len(initialCluster[i].Name) == 0 {
                klog.V(1).Infof("[etcd] etcd member name is empty. Setting it to the node name: %s", nodeName)
                initialCluster[i].Name = nodeName
            }
            break
        }
    }

    if exists {
        klog.V(1).Infof("[etcd] Etcd member already exists: %s", endpoint)
    } else {
        klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
        initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
        if err != nil {
            return err
        }

        fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
        klog.V(1).Infof("Updated etcd member list: %v", initialCluster)
    }

    fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)

    // gets etcd StaticPodSpec, actualized for the current InitConfiguration and the new list of etcd members
    spec := GetEtcdPodSpec(cfg, endpoint, nodeName, initialCluster)

    // if patchesDir is defined, patch the static Pod manifest
    if patchesDir != "" {
        patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
        if err != nil {
            return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", kubeadmconstants.Etcd)
        }
        spec = *patchedSpec
    }

    // writes etcd StaticPod to disk
    if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
        return err
    }

    fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
    if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
        return err
    }

    return nil
}

// GetEtcdPodSpec returns the etcd static Pod actualized to the context of the current configuration
// NB. GetEtcdPodSpec methods holds the information about how kubeadm creates etcd static pod manifests.
func GetEtcdPodSpec(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member) v1.Pod {
    pathType := v1.HostPathDirectoryOrCreate
    etcdMounts := map[string]v1.Volume{
        etcdVolumeName:  staticpodutil.NewVolume(etcdVolumeName, cfg.Etcd.Local.DataDir, &pathType),
        certsVolumeName: staticpodutil.NewVolume(certsVolumeName, cfg.CertificatesDir+"/etcd", &pathType),
    }
    // probeHostname returns the correct localhost IP address family based on the endpoint AdvertiseAddress
    probeHostname, probePort, probeScheme := staticpodutil.GetEtcdProbeEndpoint(&cfg.Etcd, utilsnet.IsIPv6String(endpoint.AdvertiseAddress))
    return staticpodutil.ComponentPod(
        v1.Container{
            Name:            kubeadmconstants.Etcd,
            Command:         getEtcdCommand(cfg, endpoint, nodeName, initialCluster),
            Image:           images.GetEtcdImage(cfg),
            ImagePullPolicy: v1.PullIfNotPresent,
            // Mount the etcd datadir path read-write so etcd can store data in a more persistent manner
            VolumeMounts: []v1.VolumeMount{
                staticpodutil.NewVolumeMount(etcdVolumeName, cfg.Etcd.Local.DataDir, false),
                staticpodutil.NewVolumeMount(certsVolumeName, cfg.CertificatesDir+"/etcd", false),
            },
            Resources: v1.ResourceRequirements{
                Requests: v1.ResourceList{
                    v1.ResourceCPU:              resource.MustParse("100m"),
                    v1.ResourceMemory:           resource.MustParse("100Mi"),
                    v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
                },
            },
            LivenessProbe: staticpodutil.LivenessProbe(probeHostname, "/health", probePort, probeScheme),
            StartupProbe:  staticpodutil.StartupProbe(probeHostname, "/health", probePort, probeScheme, cfg.APIServer.TimeoutForControlPlane),
        },
        etcdMounts,
        // etcd will listen on the advertise address of the API server, in a different port (2379)
        map[string]string{kubeadmconstants.EtcdAdvertiseClientUrlsAnnotationKey: etcdutil.GetClientURL(endpoint)},
    )
}

静态pod配置

cmd/kubeadm/app/util/staticpod/utils.go

// ComponentPod returns a Pod object from the container, volume and annotations specifications
func ComponentPod(container v1.Container, volumes map[string]v1.Volume, annotations map[string]string) v1.Pod {
    return v1.Pod{
        TypeMeta: metav1.TypeMeta{
            APIVersion: "v1",
            Kind:       "Pod",
        },
        ObjectMeta: metav1.ObjectMeta{
            Name:      container.Name,
            Namespace: "kube-system", 
            // The component and tier labels are useful for quickly identifying the control plane Pods when doing a .List()
            // against Pods in the kube-system namespace. Can for example be used together with the WaitForPodsWithLabel function
            Labels:      map[string]string{"component": container.Name, "tier": "control-plane"},  
            Annotations: annotations,
        },
        Spec: v1.PodSpec{
            Containers:        []v1.Container{container},
            PriorityClassName: "system-node-critical",
            // HostNetwork设置为true,pod会使用主机的dns以及所有网络配置pod中运行的应用程序可以直接看到宿主主机的网络接口
            // Pod中的所有容器就直接暴露在宿主机的网络环境中,宿主主机所在的局域网上所有网络接口都可以访问到该应用程序,Pod的PodIP就是其所在Node的IP
            // 但是无法使用k8s自带的dns解析服务,也就意味着无法访问service中定义的服务。
            HostNetwork:       true,
            Volumes:           VolumeMapToSlice(volumes),
        },
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容