1. 前言
转载请说明原文出处, 尊重他人劳动成果!
源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/controller/podgc
分支: tming-v1.13 (基于v1.13版本)
关于各类
controller
都会用到informers
, 所以关于informers
, 可以参考 [k8s源码分析][client-go] informer之SharedInformerFactory.
本文分析的是
gc_controller
, 就是处理pod
的垃圾回收的控制器, 是各种controller
中属于较简单的其中一个了, 因为它的逻辑比较简单.
2. PodGCController
2.1 启动
关于
kube-controller-manager
组件整体的运行会有专门博客介绍, 这里直接看一下podgc
这个controller
是如何启动的.
// cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
...
controllers["podgc"] = startPodGCController
...
return controllers
}
// cmd/kube-controller-manager/app/core.go
unc startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Core().V1().Pods(),
int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return nil, true, nil
}
可以看到生成一个
PodGCController
对象, 然后以goroutine
的方式启动它的Run
方法.
2.2 PodGCController
// pkg/controller/podgc/gc_controller.go
const (
gcCheckPeriod = 20 * time.Second
)
type PodGCController struct {
kubeClient clientset.Interface
// 获得本地缓存的Lister
podLister corelisters.PodLister
// 同步函数
podListerSynced cache.InformerSynced
// 删除pod的方法
deletePod func(namespace, name string) error
// 一个阈值
terminatedPodThreshold int
}
func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer, terminatedPodThreshold int) *PodGCController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
gcc := &PodGCController{
kubeClient: kubeClient,
terminatedPodThreshold: terminatedPodThreshold,
deletePod: func(namespace, name string) error {
klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name)
return kubeClient.CoreV1().Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
},
}
// 获得本地缓存
gcc.podLister = podInformer.Lister()
// 等待同步函数
gcc.podListerSynced = podInformer.Informer().HasSynced
return gcc
}
func (gcc *PodGCController) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting GC controller")
defer klog.Infof("Shutting down GC controller")
// 等待同步
if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) {
return
}
// 执行gc方法
go wait.Until(gcc.gc, gcCheckPeriod, stop)
<-stop
}
主要是每隔
20
秒执行gc
方法.
gc
func (gcc *PodGCController) gc() {
// 获得本地缓存的所有的pods
pods, err := gcc.podLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error while listing all Pods: %v", err)
return
}
// 代表集群中最多留terminatedPodThreshold个terminating的pod
if gcc.terminatedPodThreshold > 0 {
gcc.gcTerminated(pods)
}
// 删除那些在失联的节点上的pods (某些节点被删除了或者失联了需要删除在这些节点上的pods)
gcc.gcOrphaned(pods)
// 删除那些还没有被分配到任何节点但是已经在terminating的pods
gcc.gcUnscheduledTerminating(pods)
}
关于
terminating
的定义如下:
func isPodTerminated(pod *v1.Pod) bool {
if phase := pod.Status.Phase; phase != v1.PodPending && phase != v1.PodRunning && phase != v1.PodUnknown {
return true
}
return false
}
gc
主要做三件事情:
1. 删除那些terminating
的pods
. (集群中最多留terminatedPodThreshold
个terminating pod
.)
2. 删除那些在失联的节点上的pods
(某些节点被删除了或者失联了需要删除在这些节点上的pods
)
3. 删除那些还没有被分配到任何节点但是已经在terminating
的pods
.
gcTerminated
func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
terminatedPods := []*v1.Pod{}
// 过滤出所有的terminating的pods
for _, pod := range pods {
if isPodTerminated(pod) {
terminatedPods = append(terminatedPods, pod)
}
}
terminatedPodCount := len(terminatedPods)
sort.Sort(byCreationTimestamp(terminatedPods))
deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
// 就是最多留terminatedPodThreshold个terminated的pods 其余的都要删除
if deleteCount > terminatedPodCount {
deleteCount = terminatedPodCount
}
if deleteCount > 0 {
klog.Infof("garbage collecting %v pods", deleteCount)
}
// 删除pods
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
defer wait.Done()
if err := gcc.deletePod(namespace, name); err != nil {
// ignore not founds
defer utilruntime.HandleError(err)
}
}(terminatedPods[i].Namespace, terminatedPods[i].Name)
}
wait.Wait()
}
作用是计算出要删除的
terminating
的pods
个数. 然后做删除操作.
gcOrphaned
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
klog.V(4).Infof("GC'ing orphaned")
// We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible.
// 取出本地缓存的节点
nodes, err := gcc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return
}
nodeNames := sets.NewString()
for i := range nodes.Items {
nodeNames.Insert(nodes.Items[i].Name)
}
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if nodeNames.Has(pod.Spec.NodeName) {
continue
}
// 删除那些在失联的节点上的pods (某些节点被删除了或者失联了需要删除在这些节点上的pods)
klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName)
if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
utilruntime.HandleError(err)
} else {
klog.V(0).Infof("Forced deletion of orphaned Pod %v/%v succeeded", pod.Namespace, pod.Name)
}
}
}
找到那些绑定在已经不存在的节点上的
pods
并删除.
gcUnscheduledTerminating
func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")
for _, pod := range pods {
if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
continue
}
// 表明 pod.DeletionTimestamp != nil && len(pod.Spec.NodeName) <= 0
// 删除那些还没有被分配到任何节点但是已经在terminating的pods
klog.V(2).Infof("Found unscheduled terminating Pod %v/%v not assigned to any Node. Deleting.", pod.Namespace, pod.Name)
if err := gcc.deletePod(pod.Namespace, pod.Name); err != nil {
utilruntime.HandleError(err)
} else {
klog.V(0).Infof("Forced deletion of unscheduled terminating Pod %v/%v succeeded", pod.Namespace, pod.Name)
}
}
}
删除那些还没有分配并且已经在
terminating
的pods
.
3. 总结
可以看到该
controller
就是定期去删除一些集群中认为没有用的pods
.