kubelet启动流程解析(四)kubelet启动流程简析

概述

前文我们介绍了kubelet参数初始化,接下来我们分析下kubelet服务实际启动逻辑。

整体流程大致如下:

  1. 设置全局门控特性
  2. kubelet参数合法性检测
  3. 注册当前配置至/configz端点
  4. 检查kubelet启动模式是否为standalone模式
  5. 检测kubeDeps是否为空,为空则初始化
  6. 获取主机名称,用于初始化事件记录器
  7. standalone模式下关闭所有客户端连接
  8. 初始化身份认证接口
  9. 初始化cgroups
  10. 初始化cAdvisor
  11. 初始化事件记录器,用于向kubelet端事件
  12. 初始化容器管理器
  13. 检测是否以root用户运行kubelet
  14. kubelet进程设置OOM分数
  15. 容器运行时初始化
  16. 启动kubelet
  17. 如果开启动态配置,则监听动态配置中的配置变化
  18. 开启/healthz端点
  19. 通知init进程kubelet服务启动完毕

针对上述步骤我们接下来逐一分析,针对部分内容(容器运行时启动流程、kubelet启动流程)不做过多讨论,后续篇幅再做分析。

部分通用内容(如cAdvisor/configz端点、/healthz端点,OOM分数等),后续篇幅讨论。

函数调用

if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
    klog.Fatal(err)
}
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())
    if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}

最终启动逻辑为run函数

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
    // Set global feature gates based on the value on the initial KubeletServer
    err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
    if err != nil {
        return err
    }
    // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
    if err := options.ValidateKubeletServer(s); err != nil {
        return err
    }

    // Obtain Kubelet Lock File
    if s.ExitOnLockContention && s.LockFilePath == "" {
        return errors.New("cannot exit on lock file contention: no lock file specified")
    }
    done := make(chan struct{})
    if s.LockFilePath != "" {
        klog.Infof("acquiring file lock on %q", s.LockFilePath)
        if err := flock.Acquire(s.LockFilePath); err != nil {
            return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
        }
        if s.ExitOnLockContention {
            klog.Infof("watching for inotify events for: %v", s.LockFilePath)
            if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
                return err
            }
        }
    }

    // Register current configuration with /configz endpoint
    err = initConfigz(&s.KubeletConfiguration)
    if err != nil {
        klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
    }

    if len(s.ShowHiddenMetricsForVersion) > 0 {
        metrics.SetShowHidden()
    }

    // About to get clients and such, detect standaloneMode
    standaloneMode := true
    if len(s.KubeConfig) > 0 {
        standaloneMode = false
    }

    if kubeDeps == nil {
        kubeDeps, err = UnsecuredDependencies(s, featureGate)
        if err != nil {
            return err
        }
    }

    if kubeDeps.Cloud == nil {
        if !cloudprovider.IsExternal(s.CloudProvider) {
            cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
            if err != nil {
                return err
            }
            if cloud == nil {
                klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
            } else {
                klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
            }
            kubeDeps.Cloud = cloud
        }
    }

    hostName, err := nodeutil.GetHostname(s.HostnameOverride)
    if err != nil {
        return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
    if err != nil {
        return err
    }

    // if in standalone mode, indicate as much by setting all clients to nil
    switch {
    case standaloneMode:
        kubeDeps.KubeClient = nil
        kubeDeps.EventClient = nil
        kubeDeps.HeartbeatClient = nil
        klog.Warningf("standalone mode, no API client")

    case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
        clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
        if err != nil {
            return err
        }
        if closeAllConns == nil {
            return errors.New("closeAllConns must be a valid function other than nil")
        }
        kubeDeps.OnHeartbeatFailure = closeAllConns

        kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet client: %v", err)
        }

        // make a separate client for events
        eventClientConfig := *clientConfig
        eventClientConfig.QPS = float32(s.EventRecordQPS)
        eventClientConfig.Burst = int(s.EventBurst)
        kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet event client: %v", err)
        }

        // make a separate client for heartbeat with throttling disabled and a timeout attached
        heartbeatClientConfig := *clientConfig
        heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
        // The timeout is the minimum of the lease duration and status update frequency
        leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
        if heartbeatClientConfig.Timeout > leaseTimeout {
            heartbeatClientConfig.Timeout = leaseTimeout
        }

        heartbeatClientConfig.QPS = float32(-1)
        kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
        }
    }

    if kubeDeps.Auth == nil {
        auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
        if err != nil {
            return err
        }
        kubeDeps.Auth = auth
        runAuthenticatorCAReload(stopCh)
    }

    var cgroupRoots []string

    cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
    kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
    if err != nil {
        klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
    } else if kubeletCgroup != "" {
        cgroupRoots = append(cgroupRoots, kubeletCgroup)
    }

    runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
    if err != nil {
        klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
    } else if runtimeCgroup != "" {
        // RuntimeCgroups is optional, so ignore if it isn't specified
        cgroupRoots = append(cgroupRoots, runtimeCgroup)
    }

    if s.SystemCgroups != "" {
        // SystemCgroups is optional, so ignore if it isn't specified
        cgroupRoots = append(cgroupRoots, s.SystemCgroups)
    }

    if kubeDeps.CAdvisorInterface == nil {
        imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
        kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
        if err != nil {
            return err
        }
    }

    // Setup event recorder if required.
    makeEventRecorder(kubeDeps, nodeName)

    if kubeDeps.ContainerManager == nil {
        if s.CgroupsPerQOS && s.CgroupRoot == "" {
            klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
            s.CgroupRoot = "/"
        }

        var reservedSystemCPUs cpuset.CPUSet
        var errParse error
        if s.ReservedSystemCPUs != "" {
            reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
            if errParse != nil {
                // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
                klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
                return errParse
            }
            // is it safe do use CAdvisor here ??
            machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
            if err != nil {
                // if can't use CAdvisor here, fall back to non-explicit cpu list behavor
                klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
                reservedSystemCPUs = cpuset.NewCPUSet()
            } else {
                reservedList := reservedSystemCPUs.ToSlice()
                first := reservedList[0]
                last := reservedList[len(reservedList)-1]
                if first < 0 || last >= machineInfo.NumCores {
                    // the specified cpuset is outside of the range of what the machine has
                    klog.Infof("Invalid cpuset specified by --reserved-cpus")
                    return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
                }
            }
        } else {
            reservedSystemCPUs = cpuset.NewCPUSet()
        }

        if reservedSystemCPUs.Size() > 0 {
            // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
            klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
            if s.KubeReserved != nil {
                delete(s.KubeReserved, "cpu")
            }
            if s.SystemReserved == nil {
                s.SystemReserved = make(map[string]string)
            }
            s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
            klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
        }
        kubeReserved, err := parseResourceList(s.KubeReserved)
        if err != nil {
            return err
        }
        systemReserved, err := parseResourceList(s.SystemReserved)
        if err != nil {
            return err
        }
        var hardEvictionThresholds []evictionapi.Threshold
        // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
        if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
            hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
            if err != nil {
                return err
            }
        }
        experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
        if err != nil {
            return err
        }

        devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

        kubeDeps.ContainerManager, err = cm.NewContainerManager(
            kubeDeps.Mounter,
            kubeDeps.CAdvisorInterface,
            cm.NodeConfig{
                RuntimeCgroupsName:    s.RuntimeCgroups,
                SystemCgroupsName:     s.SystemCgroups,
                KubeletCgroupsName:    s.KubeletCgroups,
                ContainerRuntime:      s.ContainerRuntime,
                CgroupsPerQOS:         s.CgroupsPerQOS,
                CgroupRoot:            s.CgroupRoot,
                CgroupDriver:          s.CgroupDriver,
                KubeletRootDir:        s.RootDirectory,
                ProtectKernelDefaults: s.ProtectKernelDefaults,
                NodeAllocatableConfig: cm.NodeAllocatableConfig{
                    KubeReservedCgroupName:   s.KubeReservedCgroup,
                    SystemReservedCgroupName: s.SystemReservedCgroup,
                    EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
                    KubeReserved:             kubeReserved,
                    SystemReserved:           systemReserved,
                    ReservedSystemCPUs:       reservedSystemCPUs,
                    HardEvictionThresholds:   hardEvictionThresholds,
                },
                QOSReserved:                           *experimentalQOSReserved,
                ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
                ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
                ExperimentalPodPidsLimit:              s.PodPidsLimit,
                EnforceCPULimits:                      s.CPUCFSQuota,
                CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
                ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
            },
            s.FailSwapOn,
            devicePluginEnabled,
            kubeDeps.Recorder)

        if err != nil {
            return err
        }
    }

    if err := checkPermissions(); err != nil {
        klog.Error(err)
    }

    utilruntime.ReallyCrash = s.ReallyCrashForTesting

    // TODO(vmarmol): Do this through container config.
    oomAdjuster := kubeDeps.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
        klog.Warning(err)
    }

    err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
        kubeDeps, &s.ContainerRuntimeOptions,
        s.ContainerRuntime,
        s.RuntimeCgroups,
        s.RemoteRuntimeEndpoint,
        s.RemoteImageEndpoint,
        s.NonMasqueradeCIDR)
    if err != nil {
        return err
    }

    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }

    // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
    if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
        kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
        if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
            return err
        }
    }

    if s.HealthzPort > 0 {
        mux := http.NewServeMux()
        healthz.InstallHandler(mux)
        go wait.Until(func() {
            err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
            if err != nil {
                klog.Errorf("Starting healthz server failed: %v", err)
            }
        }, 5*time.Second, wait.NeverStop)
    }

    if s.RunOnce {
        return nil
    }

    // If systemd is used, notify it that we have started
    go daemon.SdNotify(false, "READY=1")

    select {
    case <-done:
        break
    case <-stopCh:
        break
    }

    return nil
}

代码量较大,我们按步骤分析

1.设置全局门控特性

err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
    if err != nil {
    return err
}

2.kubelet参数合法性检测

if err := options.ValidateKubeletServer(s); err != nil {
    return err
}

检测内容:配置标识及门控特性

3.注册当前配置至/configz端点

err = initConfigz(&s.KubeletConfiguration)
if err != nil {
    klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}

4.检查kubelet启动模式是否为standalone模式

此模式下不会和api-server交互,主要用于kubelet的调试

standaloneMode := true
if len(s.KubeConfig) > 0 {
    standaloneMode = false
}

5.检测kubeDeps是否为空,为空则初始化

前文我们讲到,执行Run函数前已经初始化kubeDepskubeDeps是一个与运行时各种资源(网络、卷、容器运行时等)交互的接口集合对象。

if kubeDeps == nil {
    kubeDeps, err = UnsecuredDependencies(s, featureGate)
    if err != nil {
        return err
    }
}

if kubeDeps.Cloud == nil {
    if !cloudprovider.IsExternal(s.CloudProvider) {
        cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
        if err != nil {
            return err
        }
        if cloud == nil {
            klog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
        } else {
            klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
        }
        kubeDeps.Cloud = cloud
    }
}

6.获取主机名称

用于后续初始化事件记录器

  • 如果指定--cloud-provider,获取云主机节点名称。
  • 如果未指定--cloud-provider,并且指定了--hostname-override,返回--hostname-override值作为主机名
  • 如果未指定--cloud-provider--hostname-override,返回节点hostname
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
    return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
    return err
}

7.standalone模式下关闭所有客户端连接

switch {
case standaloneMode:
    kubeDeps.KubeClient = nil
    kubeDeps.EventClient = nil
    kubeDeps.HeartbeatClient = nil
    klog.Warningf("standalone mode, no API client")

case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
    clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
    if err != nil {
        return err
    }
    if closeAllConns == nil {
        return errors.New("closeAllConns must be a valid function other than nil")
    }
    kubeDeps.OnHeartbeatFailure = closeAllConns

    kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize kubelet client: %v", err)
    }

    // make a separate client for events
    eventClientConfig := *clientConfig
    eventClientConfig.QPS = float32(s.EventRecordQPS)
    eventClientConfig.Burst = int(s.EventBurst)
    kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize kubelet event client: %v", err)
    }

    // make a separate client for heartbeat with throttling disabled and a timeout attached
    heartbeatClientConfig := *clientConfig
    heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
    // The timeout is the minimum of the lease duration and status update frequency
    leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
    if heartbeatClientConfig.Timeout > leaseTimeout {
        heartbeatClientConfig.Timeout = leaseTimeout
    }

    heartbeatClientConfig.QPS = float32(-1)
    kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
    if err != nil {
        return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
    }
}

8.初始化身份认证接口

BuildAuth创建一个身份验证器、一个授权器,以及一个与kubelet需要兼容的匹配的授权器属性getter.

它返回一个AuthInterface认证接口,一个运行方法来启动内部控制器(如重新加载证书)和错误。

if kubeDeps.Auth == nil {
    auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
    if err != nil {
        return err
    }
    kubeDeps.Auth = auth
    runAuthenticatorCAReload(stopCh)
}

9.初始化cgroups

包含如下:

  • kubelet cgroups
  • 容器运行时cgroups
  • 系统cgroups
var cgroupRoots []string

cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
    klog.Warningf("failed to get the kubelet's cgroup: %v.  Kubelet system container metrics may be missing.", err)
} else if kubeletCgroup != "" {
    cgroupRoots = append(cgroupRoots, kubeletCgroup)
}

runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
    klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
} else if runtimeCgroup != "" {
    // RuntimeCgroups is optional, so ignore if it isn't specified
    cgroupRoots = append(cgroupRoots, runtimeCgroup)
}

if s.SystemCgroups != "" {
    // SystemCgroups is optional, so ignore if it isn't specified
    cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}

10.初始化cAdvisor

docker容器运行时内置cAdvisor获取容器指标数据

if kubeDeps.CAdvisorInterface == nil {
    imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
    kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
    if err != nil {
        return err
    }
}

11.初始化事件记录器,用于向kubelet端事件

// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)

事件格式如下:

$ kubectl describe pod -n ddd portal-f6f4b4486-grhb7
...
Events:
  Type     Reason     Age                       From            Message
  ----     ------     ----                      ----            -------
  Warning  Unhealthy  7m49s (x22310 over 7d3h)  kubelet, node1  Liveness probe failed: Get http://10.233.90.203:7002/actuator/health: dial tcp 10.233.90.203:7002: connect: connection refused
  Warning  BackOff    2m49s (x27215 over 7d3h)  kubelet, node1  Back-off restarting failed container

12.初始化容器管理器

容器管理器主要用来管理容器:

1.如果开启--cgroups-per-qos,并且--cgroup-root未指定,cgroups的根为/
即启用基于QoSCgroup层次结构,所有的BurstableBestEffort类型pod都在它们特定的顶级QoS cgroup之下。

如:

$ ls /sys/fs/cgroup/cpu/kubepods.slice
cgroup.clone_children  cpuacct.usage_percpu_sys   cpu.rt_period_us           kubepods-pod347e1023_78aa_4aa6_a1bb_c11e60e995e1.slice
cgroup.procs           cpuacct.usage_percpu_user  cpu.rt_runtime_us          kubepods-podafe0da25_4a42_4a71_82c8_afcd7faf3b52.slice
cpuacct.stat           cpuacct.usage_sys          cpu.shares                 kubepods-pode61df7e6_b184_4c86_bd1e_734c818a4a1f.slice
cpuacct.usage          cpuacct.usage_user         cpu.stat                   notify_on_release
cpuacct.usage_all      cpu.cfs_period_us          kubepods-besteffort.slice  tasks
cpuacct.usage_percpu   cpu.cfs_quota_us           kubepods-burstable.slice

2.--reserved-cpus如果非空,初始化系统CPU预留资源。
--reserved-cpus被设置时,--system-reserved--kube-reserved将无效化。初始化逻辑如下:

a. 检测--reserved-cpus值合法性,如果非法则置空,避免--system-reserved--kube-reserved无效化

b. 检测是否可以从CAdvisor中获取主机信息,如果获取不了则置空--reserved-cpus

c. 检测--reserved-cpus值是否在宿主机CPU核数有效区间,非法则返回异常(如宿主机8核,指令预留12核,大于宿主机CPU实际核数)

d. 解析赋值容器管理器其他字段:

kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
    return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
    return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
    hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
    if err != nil {
        return err
    }
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
    return err
}

devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

kubeDeps.ContainerManager, err = cm.NewContainerManager(
    kubeDeps.Mounter,
    kubeDeps.CAdvisorInterface,
    cm.NodeConfig{
        RuntimeCgroupsName:    s.RuntimeCgroups,
        SystemCgroupsName:     s.SystemCgroups,
        KubeletCgroupsName:    s.KubeletCgroups,
        ContainerRuntime:      s.ContainerRuntime,
        CgroupsPerQOS:         s.CgroupsPerQOS,
        CgroupRoot:            s.CgroupRoot,
        CgroupDriver:          s.CgroupDriver,
        KubeletRootDir:        s.RootDirectory,
        ProtectKernelDefaults: s.ProtectKernelDefaults,
        NodeAllocatableConfig: cm.NodeAllocatableConfig{
            KubeReservedCgroupName:   s.KubeReservedCgroup,
            SystemReservedCgroupName: s.SystemReservedCgroup,
            EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
            KubeReserved:             kubeReserved,
            SystemReserved:           systemReserved,
            ReservedSystemCPUs:       reservedSystemCPUs,
            HardEvictionThresholds:   hardEvictionThresholds,
        },
        QOSReserved:                           *experimentalQOSReserved,
        ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
        ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
        ExperimentalPodPidsLimit:              s.PodPidsLimit,
        EnforceCPULimits:                      s.CPUCFSQuota,
        CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
        ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
    },
    s.FailSwapOn,
    devicePluginEnabled,
    kubeDeps.Recorder)

容器控制器初始化部分源码:

if kubeDeps.ContainerManager == nil {
    if s.CgroupsPerQOS && s.CgroupRoot == "" {
        klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
        s.CgroupRoot = "/"
    }

    var reservedSystemCPUs cpuset.CPUSet
    var errParse error
    if s.ReservedSystemCPUs != "" {
        reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
        if errParse != nil {
            // invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
            klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
            return errParse
        }
        // is it safe do use CAdvisor here ??
        machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
        if err != nil {
            // if can't use CAdvisor here, fall back to non-explicit cpu list behavor
            klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
            reservedSystemCPUs = cpuset.NewCPUSet()
        } else {
            reservedList := reservedSystemCPUs.ToSlice()
            first := reservedList[0]
            last := reservedList[len(reservedList)-1]
            if first < 0 || last >= machineInfo.NumCores {
                // the specified cpuset is outside of the range of what the machine has
                klog.Infof("Invalid cpuset specified by --reserved-cpus")
                return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
            }
        }
    } else {
        reservedSystemCPUs = cpuset.NewCPUSet()
    }

    if reservedSystemCPUs.Size() > 0 {
        // at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
        klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
        if s.KubeReserved != nil {
            delete(s.KubeReserved, "cpu")
        }
        if s.SystemReserved == nil {
            s.SystemReserved = make(map[string]string)
        }
        s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
        klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
    }
    kubeReserved, err := parseResourceList(s.KubeReserved)
    if err != nil {
        return err
    }
    systemReserved, err := parseResourceList(s.SystemReserved)
    if err != nil {
        return err
    }
    var hardEvictionThresholds []evictionapi.Threshold
    // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
    if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
        hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
        if err != nil {
            return err
        }
    }
    experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
    if err != nil {
        return err
    }

    devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

    kubeDeps.ContainerManager, err = cm.NewContainerManager(
        kubeDeps.Mounter,
        kubeDeps.CAdvisorInterface,
        cm.NodeConfig{
            RuntimeCgroupsName:    s.RuntimeCgroups,
            SystemCgroupsName:     s.SystemCgroups,
            KubeletCgroupsName:    s.KubeletCgroups,
            ContainerRuntime:      s.ContainerRuntime,
            CgroupsPerQOS:         s.CgroupsPerQOS,
            CgroupRoot:            s.CgroupRoot,
            CgroupDriver:          s.CgroupDriver,
            KubeletRootDir:        s.RootDirectory,
            ProtectKernelDefaults: s.ProtectKernelDefaults,
            NodeAllocatableConfig: cm.NodeAllocatableConfig{
                KubeReservedCgroupName:   s.KubeReservedCgroup,
                SystemReservedCgroupName: s.SystemReservedCgroup,
                EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
                KubeReserved:             kubeReserved,
                SystemReserved:           systemReserved,
                ReservedSystemCPUs:       reservedSystemCPUs,
                HardEvictionThresholds:   hardEvictionThresholds,
            },
            QOSReserved:                           *experimentalQOSReserved,
            ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
            ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
            ExperimentalPodPidsLimit:              s.PodPidsLimit,
            EnforceCPULimits:                      s.CPUCFSQuota,
            CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
            ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
        },
        s.FailSwapOn,
        devicePluginEnabled,
        kubeDeps.Recorder)

    if err != nil {
        return err
    }
}

13.检测是否以root用户运行kubelet

如果非root用户则返回异常。

if err := checkPermissions(); err != nil {
    klog.Error(err)
}
...
func checkPermissions() error {
    if uid := os.Getuid(); uid != 0 {
        return fmt.Errorf("kubelet needs to run as uid `0`. It is being run as %d", uid)
    }
    // TODO: Check if kubelet is running in the `initial` user namespace.
    // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
    return nil
}

14.为kubelet进程设置OOM分数

即设置为--oom-score-adj的值,可选区间为[-1000, 1000],默认值为-999,并且该值越小越不容易被kill掉。

关于linux oom分数可以参考linux内核的oom score是咋算出来的

oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
    klog.Warning(err)
}

15.容器运行时初始化

  • 当容器运行时为docker时,初始化以下内容:
    • 网络插件名称(一般为cni
    • CIDR
    • CNI插件配置、缓存、二进制文件目录
    • MTU
    • 网桥模式
    • 创建启动CRI shim进程,作为连接kubelet与容器运行时间的桥梁
    • 设置是否使用cAdvisor采集容器指标数据

16.启动kubelet

if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
    return err
}

启动流程主要如下:

a. 初始化事件记录器

hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
    return err
}
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
    return err
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)

b. kubelet进程开启所有Linux CAP

capabilities.Initialize(capabilities.Capabilities{
    AllowPrivileged: true,
})

c. 初始化kubelet操作操作系统接口方法

if kubeDeps.OSInterface == nil {
    kubeDeps.OSInterface = kubecontainer.RealOS{}
}

接口如下:

type OSInterface interface {
    MkdirAll(path string, perm os.FileMode) error
    Symlink(oldname string, newname string) error
    Stat(path string) (os.FileInfo, error)
    Remove(path string) error
    RemoveAll(path string) error
    Create(path string) (*os.File, error)
    Chmod(path string, perm os.FileMode) error
    Hostname() (name string, err error)
    Chtimes(path string, atime time.Time, mtime time.Time) error
    Pipe() (r *os.File, w *os.File, err error)
    ReadDir(dirname string) ([]os.FileInfo, error)
    Glob(pattern string) ([]string, error)
    Open(name string) (*os.File, error)
    OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
    Rename(oldpath, newpath string) error
}

d. 创建初始化kubelet服务

初始化逻辑后续我们深入探讨

k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
      kubeDeps,
      &kubeServer.ContainerRuntimeOptions,
      kubeServer.ContainerRuntime,
      kubeServer.HostnameOverride,
      kubeServer.NodeIP,
      kubeServer.ProviderID,
      kubeServer.CloudProvider,
      kubeServer.CertDirectory,
      kubeServer.RootDirectory,
      kubeServer.RegisterNode,
      kubeServer.RegisterWithTaints,
      kubeServer.AllowedUnsafeSysctls,
      kubeServer.ExperimentalMounterPath,
      kubeServer.ExperimentalKernelMemcgNotification,
      kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
      kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
      kubeServer.MinimumGCAge,
      kubeServer.MaxPerPodContainerCount,
      kubeServer.MaxContainerCount,
      kubeServer.MasterServiceNamespace,
      kubeServer.RegisterSchedulable,
      kubeServer.KeepTerminatedPodVolumes,
      kubeServer.NodeLabels,
      kubeServer.SeccompProfileRoot,
      kubeServer.BootstrapCheckpointPath,
      kubeServer.NodeStatusMaxImages)
if err != nil {
    return fmt.Errorf("failed to create kubelet: %v", err)
}

// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
    return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig

e. 设置kubelet进程最大文件打开数

rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

f. 启动kubelet服务

// process pods and exit.
if runOnce {
    if _, err := k.RunOnce(podCfg.Updates()); err != nil {
        return fmt.Errorf("runonce failed: %v", err)
    }
    klog.Info("Started kubelet as runonce")
} else {
    startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
    klog.Info("Started kubelet")
}

17.如果开启动态配置,则监听动态配置中的配置变化

// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
    kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
    if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
        return err
    }
}

18.开启/healthz端点

if s.HealthzPort > 0 {
    mux := http.NewServeMux()
    healthz.InstallHandler(mux)
    go wait.Until(func() {
        err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
        if err != nil {
            klog.Errorf("Starting healthz server failed: %v", err)
        }
    }, 5*time.Second, wait.NeverStop)
}

19.通知init进程kubelet服务启动完毕

if s.RunOnce {
    return nil
}

// If systemd is used, notify it that we have started
go daemon.SdNotify(false, "READY=1")
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容