静态pod manifest文件存放目录默认为/etc/kubernetes/manifests
cmd/kubeadm/app/constants/constants.go
const (
// KubernetesDir is the directory Kubernetes owns for storing various configuration files
KubernetesDir = "/etc/kubernetes"
// ManifestsSubDirName defines directory name to store manifests
ManifestsSubDirName = "manifests"
...
)
创建 kube-apiserver kube-controller-manager kube-scheduler的manifest文件
cmd/kubeadm/app/phases/controlplane/manifests.go
// CreateInitStaticPodManifestFiles will write all static pod manifest files needed to bring up the control plane.
func CreateInitStaticPodManifestFiles(manifestDir, patchesDir string, cfg *kubeadmapi.InitConfiguration) error {
klog.V(1).Infoln("[control-plane] creating static Pod files")
return CreateStaticPodFiles(manifestDir, patchesDir, &cfg.ClusterConfiguration, &cfg.LocalAPIEndpoint, kubeadmconstants.KubeAPIServer, kubeadmconstants.KubeControllerManager, kubeadmconstants.KubeScheduler)
}
// CreateStaticPodFiles creates all the requested static pod files.
func CreateStaticPodFiles(manifestDir, patchesDir string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, componentNames ...string) error {
// gets the StaticPodSpecs, actualized for the current ClusterConfiguration
klog.V(1).Infoln("[control-plane] getting StaticPodSpecs")
specs := GetStaticPodSpecs(cfg, endpoint)
// creates required static pod specs
for _, componentName := range componentNames {
// retrieves the StaticPodSpec for given component
spec, exists := specs[componentName]
if !exists {
return errors.Errorf("couldn't retrieve StaticPodSpec for %q", componentName)
}
// print all volumes that are mounted
for _, v := range spec.Spec.Volumes {
klog.V(2).Infof("[control-plane] adding volume %q for component %q", v.Name, componentName)
}
// if patchesDir is defined, patch the static Pod manifest
if patchesDir != "" {
patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
if err != nil {
return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", componentName)
}
spec = *patchedSpec
}
// writes the StaticPodSpec to disk,如果manifestDir文件夹不存在,则创建文件夹且权限为0700
if err := staticpodutil.WriteStaticPodToDisk(componentName, manifestDir, spec); err != nil {
return errors.Wrapf(err, "failed to create static pod manifest file for %q", componentName)
}
klog.V(1).Infof("[control-plane] wrote static Pod manifest for component %q to %q\n", componentName, kubeadmconstants.GetStaticPodFilepath(componentName, manifestDir))
}
return nil
}
// GetStaticPodSpecs returns all staticPodSpecs actualized to the context of the current configuration
// NB. this methods holds the information about how kubeadm creates static pod manifests.
func GetStaticPodSpecs(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint) map[string]v1.Pod {
// Get the required hostpath mounts
mounts := getHostPathVolumesForTheControlPlane(cfg)
// Prepare static pod specs
staticPodSpecs := map[string]v1.Pod{
kubeadmconstants.KubeAPIServer: staticpodutil.ComponentPod(v1.Container{
Name: kubeadmconstants.KubeAPIServer,
Image: images.GetKubernetesImage(kubeadmconstants.KubeAPIServer, cfg),
ImagePullPolicy: v1.PullIfNotPresent,
Command: getAPIServerCommand(cfg, endpoint),
VolumeMounts: staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeAPIServer)),
LivenessProbe: staticpodutil.LivenessProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/livez", int(endpoint.BindPort), v1.URISchemeHTTPS),
ReadinessProbe: staticpodutil.ReadinessProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/readyz", int(endpoint.BindPort), v1.URISchemeHTTPS),
StartupProbe: staticpodutil.StartupProbe(staticpodutil.GetAPIServerProbeAddress(endpoint), "/livez", int(endpoint.BindPort), v1.URISchemeHTTPS, cfg.APIServer.TimeoutForControlPlane),
Resources: staticpodutil.ComponentResources("250m"),
Env: kubeadmutil.GetProxyEnvVars(),
}, mounts.GetVolumes(kubeadmconstants.KubeAPIServer),
map[string]string{kubeadmconstants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey: endpoint.String()}),
kubeadmconstants.KubeControllerManager: staticpodutil.ComponentPod(v1.Container{
Name: kubeadmconstants.KubeControllerManager,
Image: images.GetKubernetesImage(kubeadmconstants.KubeControllerManager, cfg),
ImagePullPolicy: v1.PullIfNotPresent,
Command: getControllerManagerCommand(cfg),
VolumeMounts: staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeControllerManager)),
LivenessProbe: staticpodutil.LivenessProbe(staticpodutil.GetControllerManagerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeControllerManagerPort, v1.URISchemeHTTPS),
StartupProbe: staticpodutil.StartupProbe(staticpodutil.GetControllerManagerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeControllerManagerPort, v1.URISchemeHTTPS, cfg.APIServer.TimeoutForControlPlane),
Resources: staticpodutil.ComponentResources("200m"),
Env: kubeadmutil.GetProxyEnvVars(),
}, mounts.GetVolumes(kubeadmconstants.KubeControllerManager), nil),
kubeadmconstants.KubeScheduler: staticpodutil.ComponentPod(v1.Container{
Name: kubeadmconstants.KubeScheduler,
Image: images.GetKubernetesImage(kubeadmconstants.KubeScheduler, cfg),
ImagePullPolicy: v1.PullIfNotPresent,
Command: getSchedulerCommand(cfg),
VolumeMounts: staticpodutil.VolumeMountMapToSlice(mounts.GetVolumeMounts(kubeadmconstants.KubeScheduler)),
LivenessProbe: staticpodutil.LivenessProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS),
StartupProbe: staticpodutil.StartupProbe(staticpodutil.GetSchedulerProbeAddress(cfg), "/healthz", kubeadmconstants.KubeSchedulerPort, v1.URISchemeHTTPS, cfg.APIServer.TimeoutForControlPlane),
Resources: staticpodutil.ComponentResources("100m"),
Env: kubeadmutil.GetProxyEnvVars(),
}, mounts.GetVolumes(kubeadmconstants.KubeScheduler), nil),
}
return staticPodSpecs
}
创建etcd manifest文件
cmd/kubeadm/app/phases/etcd/local.go
const (
etcdVolumeName = "etcd-data"
certsVolumeName = "etcd-certs"
etcdHealthyCheckInterval = 5 * time.Second
etcdHealthyCheckRetries = 8
)
// CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file.
// This function is used by init - when the etcd cluster is empty - or by kubeadm
// upgrade - when the etcd cluster is already up and running (and the --initial-cluster flag have no impact)
func CreateLocalEtcdStaticPodManifestFile(manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint) error {
if cfg.Etcd.External != nil {
return errors.New("etcd static pod manifest cannot be generated for cluster using external etcd")
}
// gets etcd StaticPodSpec
spec := GetEtcdPodSpec(cfg, endpoint, nodeName, []etcdutil.Member{})
// if patchesDir is defined, patch the static Pod manifest
if patchesDir != "" {
patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
if err != nil {
return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", kubeadmconstants.Etcd)
}
spec = *patchedSpec
}
// writes etcd StaticPod to disk
if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
return err
}
klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
return nil
}
// CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member
func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.ClusterConfiguration) error {
klog.V(1).Info("[etcd] Checking etcd cluster health")
// creates an etcd client that connects to all the local/stacked etcd members
klog.V(1).Info("creating etcd client that connects to etcd pods")
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return err
}
// Checking health state
err = etcdClient.CheckClusterHealth()
if err != nil {
return errors.Wrap(err, "etcd cluster is not healthy")
}
return nil
}
// RemoveStackedEtcdMemberFromCluster will remove a local etcd member from etcd cluster,
// when reset the control plane node.
func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
// creates an etcd client that connects to all the local/stacked etcd members
klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods")
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return err
}
members, err := etcdClient.ListMembers()
if err != nil {
return err
}
// If this is the only remaining stacked etcd member in the cluster, calling RemoveMember()
// is not needed.
if len(members) == 1 {
etcdClientAddress := etcdutil.GetClientURL(&cfg.LocalAPIEndpoint)
for _, endpoint := range etcdClient.Endpoints {
if endpoint == etcdClientAddress {
klog.V(1).Info("[etcd] This is the only remaining etcd member in the etcd cluster, skip removing it")
return nil
}
}
}
// notifies the other members of the etcd cluster about the removing member
etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint)
klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
id, err := etcdClient.GetMemberID(etcdPeerAddress)
if err != nil {
return err
}
klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id)
members, err = etcdClient.RemoveMember(id)
if err != nil {
return err
}
klog.V(1).Infof("[etcd] Updated etcd member list: %v", members)
return nil
}
// CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file
// for an additional etcd member that is joining an existing local/stacked etcd cluster.
// Other members of the etcd cluster will be notified of the joining node in beforehand as well.
func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint) error {
// creates an etcd client that connects to all the local/stacked etcd members
klog.V(1).Info("creating etcd client that connects to etcd pods")
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return err
}
etcdPeerAddress := etcdutil.GetPeerURL(endpoint)
klog.V(1).Infoln("[etcd] Getting the list of existing members")
initialCluster, err := etcdClient.ListMembers()
if err != nil {
return err
}
// only add the new member if it doesn't already exists
var exists bool
klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress)
for i := range initialCluster {
if initialCluster[i].PeerURL == etcdPeerAddress {
exists = true
if len(initialCluster[i].Name) == 0 {
klog.V(1).Infof("[etcd] etcd member name is empty. Setting it to the node name: %s", nodeName)
initialCluster[i].Name = nodeName
}
break
}
}
if exists {
klog.V(1).Infof("[etcd] Etcd member already exists: %s", endpoint)
} else {
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
if err != nil {
return err
}
fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
klog.V(1).Infof("Updated etcd member list: %v", initialCluster)
}
fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)
// gets etcd StaticPodSpec, actualized for the current InitConfiguration and the new list of etcd members
spec := GetEtcdPodSpec(cfg, endpoint, nodeName, initialCluster)
// if patchesDir is defined, patch the static Pod manifest
if patchesDir != "" {
patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
if err != nil {
return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", kubeadmconstants.Etcd)
}
spec = *patchedSpec
}
// writes etcd StaticPod to disk
if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
return err
}
fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
return err
}
return nil
}
// GetEtcdPodSpec returns the etcd static Pod actualized to the context of the current configuration
// NB. GetEtcdPodSpec methods holds the information about how kubeadm creates etcd static pod manifests.
func GetEtcdPodSpec(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member) v1.Pod {
pathType := v1.HostPathDirectoryOrCreate
etcdMounts := map[string]v1.Volume{
etcdVolumeName: staticpodutil.NewVolume(etcdVolumeName, cfg.Etcd.Local.DataDir, &pathType),
certsVolumeName: staticpodutil.NewVolume(certsVolumeName, cfg.CertificatesDir+"/etcd", &pathType),
}
// probeHostname returns the correct localhost IP address family based on the endpoint AdvertiseAddress
probeHostname, probePort, probeScheme := staticpodutil.GetEtcdProbeEndpoint(&cfg.Etcd, utilsnet.IsIPv6String(endpoint.AdvertiseAddress))
return staticpodutil.ComponentPod(
v1.Container{
Name: kubeadmconstants.Etcd,
Command: getEtcdCommand(cfg, endpoint, nodeName, initialCluster),
Image: images.GetEtcdImage(cfg),
ImagePullPolicy: v1.PullIfNotPresent,
// Mount the etcd datadir path read-write so etcd can store data in a more persistent manner
VolumeMounts: []v1.VolumeMount{
staticpodutil.NewVolumeMount(etcdVolumeName, cfg.Etcd.Local.DataDir, false),
staticpodutil.NewVolumeMount(certsVolumeName, cfg.CertificatesDir+"/etcd", false),
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
},
},
LivenessProbe: staticpodutil.LivenessProbe(probeHostname, "/health", probePort, probeScheme),
StartupProbe: staticpodutil.StartupProbe(probeHostname, "/health", probePort, probeScheme, cfg.APIServer.TimeoutForControlPlane),
},
etcdMounts,
// etcd will listen on the advertise address of the API server, in a different port (2379)
map[string]string{kubeadmconstants.EtcdAdvertiseClientUrlsAnnotationKey: etcdutil.GetClientURL(endpoint)},
)
}
静态pod配置
cmd/kubeadm/app/util/staticpod/utils.go
// ComponentPod returns a Pod object from the container, volume and annotations specifications
func ComponentPod(container v1.Container, volumes map[string]v1.Volume, annotations map[string]string) v1.Pod {
return v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
Name: container.Name,
Namespace: "kube-system",
// The component and tier labels are useful for quickly identifying the control plane Pods when doing a .List()
// against Pods in the kube-system namespace. Can for example be used together with the WaitForPodsWithLabel function
Labels: map[string]string{"component": container.Name, "tier": "control-plane"},
Annotations: annotations,
},
Spec: v1.PodSpec{
Containers: []v1.Container{container},
PriorityClassName: "system-node-critical",
// HostNetwork设置为true,pod会使用主机的dns以及所有网络配置pod中运行的应用程序可以直接看到宿主主机的网络接口
// Pod中的所有容器就直接暴露在宿主机的网络环境中,宿主主机所在的局域网上所有网络接口都可以访问到该应用程序,Pod的PodIP就是其所在Node的IP
// 但是无法使用k8s自带的dns解析服务,也就意味着无法访问service中定义的服务。
HostNetwork: true,
Volumes: VolumeMapToSlice(volumes),
},
}
}