深入分析kube-batch(2)——cache
熟悉K8S的同学,一定对cache机制不陌生;在之前启动过程一文中分析过,cache的作用,本文将详细分析cache的实现。
struct
interface
kube-batch\pkg\scheduler\cache\interface.go
// Cache collects pods/nodes/queues information
// and provides information snapshot
type Cache interface {
    // Run start informer
    Run(stopCh <-chan struct{})
    // Snapshot deep copy overall cache information into snapshot
    Snapshot() *api.ClusterInfo
    // SchedulerConf return the property of scheduler configuration
    LoadSchedulerConf(path string) (map[string]string, error)
    // WaitForCacheSync waits for all cache synced
    WaitForCacheSync(stopCh <-chan struct{}) bool
    // Bind binds Task to the target host.
    // TODO(jinzhej): clean up expire Tasks.
    Bind(task *api.TaskInfo, hostname string) error
    // Evict evicts the task to release resources.
    Evict(task *api.TaskInfo, reason string) error
    // Backoff puts job in backlog for a while.
    Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error
}
type Binder interface {
    Bind(task *v1.Pod, hostname string) error
}
type Evictor interface {
    Evict(pod *v1.Pod) error
}
重点关注接口的Run/Snaoshot
implements
具体的实现在
kube-batch\pkg\scheduler\cache\cache.go
type SchedulerCache struct {
   sync.Mutex
   kubeclient *kubernetes.Clientset
   arbclient  *versioned.Clientset
   podInformer      infov1.PodInformer
   nodeInformer     infov1.NodeInformer
   pdbInformer      policyv1.PodDisruptionBudgetInformer
   nsInformer       infov1.NamespaceInformer
   podGroupInformer arbcoreinfo.PodGroupInformer
   queueInformer    arbcoreinfo.QueueInformer
   Binder  Binder
   Evictor Evictor
   recorder record.EventRecorder
   Jobs   map[arbapi.JobID]*arbapi.JobInfo
   Nodes  map[string]*arbapi.NodeInfo
   Queues map[arbapi.QueueID]*arbapi.QueueInfo
   errTasks    *cache.FIFO
   deletedJobs *cache.FIFO
   namespaceAsQueue bool
}
SchedulerCache主要由以下组件组成:
- 锁,解决快照与内存一致性问题
 - K8S clients,访问apiserver
 - Informers,ListWatch REST
 - Jobs/Nodes/Queues,缓存REST
 
new
newSchedulerCache函数代码比较多,就不都贴了。我们可以关注各个Informer的事件注册,其中最重要的就是Pod/PodGroup相关的事件处理。
Pod
sc.podInformer.Informer().AddEventHandler(
   cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
         switch obj.(type) {
         case *v1.Pod:
            pod := obj.(*v1.Pod)
            if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
               return true
            }
            return pod.Status.Phase == v1.PodRunning
         default:
            return false
         }
      },
      Handler: cache.ResourceEventHandlerFuncs{
         AddFunc:    sc.AddPod,
         UpdateFunc: sc.UpdatePod,
         DeleteFunc: sc.DeletePod,
      },
   })
   
这里可以看到,kube-batch只关心需要自己调度,并且Pending的Pod;以及Running的Pod。
kube-batch\pkg\scheduler\cache\event_handlers.go
func (sc *SchedulerCache) AddPod(obj interface{}) {
    sc.Mutex.Lock()
    defer sc.Mutex.Unlock()
    err := sc.addPod(pod)
}
// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
    pi := arbapi.NewTaskInfo(pod)
    return sc.addTask(pi)
}
全局一把锁,以后会是性能瓶颈。这里我们看到kube-batch会将Pod转换成TaskInfo缓存起来。
kube-batch\pkg\scheduler\api\job_info.go
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
   req := EmptyResource()
   // TODO(k82cn): also includes initContainers' resource.
   for _, c := range pod.Spec.Containers {
      req.Add(NewResource(c.Resources.Requests))
   }
   ti := &TaskInfo{
      UID:       TaskID(pod.UID),
      Job:       getJobID(pod),
      Name:      pod.Name,
      Namespace: pod.Namespace,
      NodeName:  pod.Spec.NodeName,
      Status:    getTaskStatus(pod),
      Priority:  1,
      Pod:    pod,
      Resreq: req,
   }
   if pod.Spec.Priority != nil {
      ti.Priority = *pod.Spec.Priority
   }
   return ti
}
转换过程比较简单,注意两点:
- 需要统计资源请求量
 - JobID通过
pod.Annotations[arbcorev1.GroupNameAnnotationKey]或者所属的controller 
kube-batch\pkg\scheduler\cache\event_handlers.go
func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
   if len(pi.Job) != 0 {
      if _, found := sc.Jobs[pi.Job]; !found {
         sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
      }
      sc.Jobs[pi.Job].AddTaskInfo(pi)
   }
}
kube-batch\pkg\scheduler\api\job_info.go
func NewJobInfo(uid JobID) *JobInfo {
   return &JobInfo{
      UID: uid,
      MinAvailable: 0,
      NodeSelector: make(map[string]string),
      Allocated:    EmptyResource(),
      TotalRequest: EmptyResource(),
      TaskStatusIndex: map[TaskStatus]tasksMap{},
      Tasks:           tasksMap{},
   }
}
func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
    ji.Tasks[ti.UID] = ti
    ji.addTaskIndex(ti)
    ji.TotalRequest.Add(ti.Resreq)
}
func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
    if _, found := ji.TaskStatusIndex[ti.Status]; !found {
        ji.TaskStatusIndex[ti.Status] = tasksMap{}
    }
    ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}
最终task会归于一个job,job主要保存tasks,资源请求总量等信息。
PodGroup
sc.podGroupInformer = arbinformer.Scheduling().V1alpha1().PodGroups()
sc.podGroupInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   AddFunc:    sc.AddPodGroup,
   UpdateFunc: sc.UpdatePodGroup,
   DeleteFunc: sc.DeletePodGroup,
})
kube-batch\pkg\scheduler\cache\event_handlers.go
func (sc *SchedulerCache) AddPodGroup(obj interface{}) {
   sc.Mutex.Lock()
   defer sc.Mutex.Unlock()
   err := sc.setPodGroup(ss)
}
func (sc *SchedulerCache) setPodGroup(ss *arbv1.PodGroup) error {
    job := getJobID(ss)
    if _, found := sc.Jobs[job]; !found {
        sc.Jobs[job] = arbapi.NewJobInfo(job)
    }
    sc.Jobs[job].SetPodGroup(ss)
    return nil
}
func getJobID(pg *arbv1.PodGroup) arbapi.JobID {
    return arbapi.JobID(fmt.Sprintf("%s/%s", pg.Namespace, pg.Name))
}
这里我们可以看到Job就是PodGroup
kube-batch\pkg\scheduler\api\job_info.go
func (ji *JobInfo) SetPodGroup(pg *arbcorev1.PodGroup) {
   ji.Name = pg.Name
   ji.Namespace = pg.Namespace
   ji.MinAvailable = pg.Spec.MinMember
   if len(pg.Spec.Queue) == 0 {
      ji.Queue = QueueID(pg.Namespace)
   } else {
      ji.Queue = QueueID(pg.Spec.Queue)
   }
   ji.PodGroup = pg
}
重点关注ji.MinAvailable = pg.Spec.MinMember
run
func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
   go sc.pdbInformer.Informer().Run(stopCh)
   go sc.podInformer.Informer().Run(stopCh)
   go sc.nodeInformer.Informer().Run(stopCh)
   go sc.podGroupInformer.Informer().Run(stopCh)
   if sc.namespaceAsQueue {
      go sc.nsInformer.Informer().Run(stopCh)
   } else {
      go sc.queueInformer.Informer().Run(stopCh)
   }
   // Re-sync error tasks.
   go sc.resync()
   // Cleanup jobs.
   go sc.cleanupJobs()
}
run方法比较简单,主要负责:
- 开始各个REST的ListWatch
 - 根据errTasks队列,重新同步Pod状态
 - 根据deletedJobs队列,清理缓存
 
Snapshot
func (sc *SchedulerCache) Snapshot() *arbapi.ClusterInfo {
   sc.Mutex.Lock()
   defer sc.Mutex.Unlock()
   snapshot := &arbapi.ClusterInfo{
      Nodes:  make([]*arbapi.NodeInfo, 0, len(sc.Nodes)),
      Jobs:   make([]*arbapi.JobInfo, 0, len(sc.Jobs)),
      Queues: make([]*arbapi.QueueInfo, 0, len(sc.Queues)),
      Others: make([]*arbapi.TaskInfo, 0, 10),
   }
   for _, value := range sc.Nodes {
      snapshot.Nodes = append(snapshot.Nodes, value.Clone())
   }
   for _, value := range sc.Queues {
      snapshot.Queues = append(snapshot.Queues, value.Clone())
   }
   for _, value := range sc.Jobs {
      // If no scheduling spec, does not handle it.
      if value.PodGroup == nil && value.PDB == nil {
         continue
      }
      snapshot.Jobs = append(snapshot.Jobs, value.Clone())
   }
   return snapshot
}
利用Deep Clone dump cache ,唯一需要注意的是必须要创建PodGroup,才能继续调度。