在 startKubelet 方法中开始运行创建好的kubelet。kubelet可以只运行一次,即RunOnce,然后立即退出,也可以作为一个后台daemon长期运行,调用startKubelet;
if runOnce {
k.RunOnce(podCfg.Updates())
} else {
startKubelet(k, podCfg, kubeCfg, kubeDeps)
}
startKubelet主要启动一个goroutine,执行k.Run(podCfg.Updates()),其中podCfg.Updates()返回的是kubetypes.PodUpdate channel,从这个channel里面我们可以获取到Pod信息;
运行 kubelet
pkg/kubelet/kubelet.go --> Run 方法
在检测到配置更新的时候,kubelet 会开始执行Run方法。
1. 初始化一些不需要容器运行时就可以启动的模块
setupDataDirs() // 设置文件系统路径
os.MkdirAll(ContainerLogsDir, 0755) // 创建容器日志路径
imageManager.Start() // 镜像管理
serverCertificateManager.Start() // 证书管理
containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService) // 容器管理
oomWatcher.Start(kl.nodeRef) // 监控oom
gpuManager.Start() // gpu
resourceAnalyzer.Start() // 资源监控
2. 启动 volumeManager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
3. 启动kl.syncNodeStatus,主要是每隔kl.nodeStatusUpdateFrequency会向kube-apiserver更新本节点的信息;
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
4. 启动kl.syncNetworkStatus,每隔30s会更新networkplugin状态;
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
5. 启动kl.updateRuntimeUp,每隔5s会调用kl.setRuntimeSync,同步container runtime(即docker或rkt)的信息;
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
6. 启动kl.syncNetworkUtil,每隔1m会同步iptables信息;
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
7. 启动podKiller,负责杀死不要的Pod;
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
8. 检查dns limits
go wait.Until(func() { kl.dnsConfigurer.CheckLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
9. 同时启动了几个重要的manager,即kl.statusManager.Start、kl.probeManager.Start、kl.pleg.Start;
kl.statusManager.Start()
kl.probeManager.Start()
kl.pleg.Start()
10. 最后启动kl.syncLoop(updates, kl);
kl.syncLoop(updates, kl)
main Loop
pkg/kubelet/kubelet.go --> syncLoop 方法
创建了两个定时器: syncTicker 和 housekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理工作。
syncTicker := time.NewTicker(time.Second)
housekeepingTicker := time.NewTicker(housekeepingPeriod)
如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。
创建一个for循环不停的执行syncLoopIteration,获取的Pod信息;kubelet通过三种方式获取Pod信息,一种是传统的通过watch kube-apiserver获取pod信息,一种是通过文件获取,最后一种是通过http获取,后两种模式即 standalone 模式;(注意第二个参数变成了 SyncHandler 类型,这是一个 interface,定义了处理不同情况的接口,这个Handler其实就是 kl 实例本身)
kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
syncLoopIteration
configCh: 将pods的配置变动信息,根据变动的类型,分发给不同的方法进行处理。
plegCh: PLEG 状态,如果 pod 的状态发生改变(因为某些情况被杀死,被暂停等),kubelet 也要做处理
syncCh: 定时器管道,每次隔一段事件去同步最新保存的 pod 状态
houseKeepingCh: 触发 Pods 的清理
liveness manager: 同步 Pods 失败或者有容器死掉了。健康检查发现某个 pod 不可用,一般也要对它进行重启
Pod 信息处理主要由这个函数进行处理,configCh这个channel就是 podcfg 的updates,即Pod信息的来源;
从 configCh 读取Pod信息,然后根据u.Op为ADD、UPDATE、REMOVE、RECONCILE进行处理,分别调用handler.HandlePodAdditions(u.Pods)、handler.HandlePodUpdates(u.Pods)、handler.HandlePodDeletions(u.Pods)、handler.HandlePodReconcile(u.Pods);
op.RESTORE 的时候,不能将 Pod 标记为准备就绪。因为一旦准备就绪了,那么清理routines就会开始回收资源;
select {
case u, open := <- configCh:
switch u.Op {
case kubetypes.ADD: handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE: handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE: handler.HandlePodRemoves(u.Pods)
case.kubetypes.RECONCILE: handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE: handler.HandlePodUpdates(u.Pods)
case.kubetypes.RESTORE: handler.HandlePodAdditions(u.Pods)
}
plegCh 这个channel 是处理 PLEG 状态, 执行同步,或者清理操作。
select {
case e := <- plegCh:
if isSyncPodWorthy(e) {
handler.HandlePodSyncs([]*v1.Pod{pod})
}
if e.Type == pleg.ContainerDied {
kl.cleanUpContainersInPod(e.ID, containerID)
}
syncCh 这个channel 处理同步操作
select {
case <-syncCh:
handler.HandlePodSyncs(podsToSync)
liveness manager 同步容器不健康信息, 如果 Pod 已经不存在了,那么直接返回
select {
case update := <-kl.livenessManager.Updates():
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
break
}
handler.HandlePodSyncs([]*v1.Pod{pod})
houseKeepingCh 这个channel 处理清理操作
select {
case <-houseKeepingCh:
if kl.sourcesReady.AllReady() {
handler.HandlePodCleanups()
}
重新总结一些kubelet的流程
1. 汇总:kubelet 的主循环方法,它从不同的管道(文件、URL 和 apiserver)监听变化,并把它们汇聚起来,到一个 channel 上面。
2. 初审:当有新的变化发生时,它会调用对应的处理函数,交给podWorkers处理,保证 pod 处于期望的状态。如果 pod 没有变化,它也会定期保证所有的容器和最新的期望状态保持一致。
3. 接待:podWorkers 对每个 Pod 的 workUpdata 事件排队,并负责更新 Cache 中的 Pod 状态,再把具体的任务转给 kubelet 处理 (syncPod 方法)。
4. 终审:kubelet 对符合条件的 Pod 进一步进行审核,例如检查是否可以在本节点运行,对于符合审查的 Pod 做一些准备工作,譬如目录创建,PV创建,Image 获取,处理Mirror Pod等,最后再把工作转给容器那一层。
5. 落地:任务到容器层以后,它负责处理Pod的情况,查看沙盒,是否重启等。最终再调用 startContainer 启动容器。