kubelet的主要工作是创建、销毁Pod。kubelet需要对Pod资源进行watch,kubelet会 watch想要watch的Pod是那些已经分配到本节点的Pods。本篇文章将从代码实现探究一下kubelet对pod资源的watch实现。
本次分析是基于kubernetes tag v1.9.0
kubelet对pod资源的watch
入口程序
pkg/kubelet/kubelet.go NewMainKubelet()
1. NewMainKubelet()将会调用makePodSourceConfig()方法;
2. 在makePodSourceConfig()方法中,pod资源以三种方式获得:第一种是通过文件获得,文件一般放在/etc/kubernetes/manifests目录下面,启动kubelet时可以通过指定–config覆盖,第二种也是通过文件过得,只不过文件是通过URL获取的,URL可以在启动kubelet时通过ManifestURL指定,第三种是通过watch kube-apiserver获取。其中前两种模式下,我们称kubelet运行在standalone模式下;我们将看看第三种方式的代码实现;
NewSourceApiserver
pkg/kubelet/config/apiserver.go NewSourceApiserver()
1. 将会调用NewListWatchFromClient()方法;
2. 将会调用NewSourceApiserverFromLW()方法;
NewListWatchFromClient
client-go/tools/cache/listwatch.go NewListWatchFromClient()
1. NewListWatchFromClient()方法将返回一个ListWatch结构体,包含listFunc和watchFunc;
2. listFunc用于List,watchFunc用于Watch;
3. ListWatch会传入NewSourceApiserverFromLW()方法;
NewSourceApiserverFromLW
pkg/kubelet/config/apiserver.go NewSourceApiserverFromLW()
1. 首先创建了一个send函数,它将从apiserver获取的pods传送到updates channel中;
2. 这些Pods是从哪里获取的了?是通过构建一个reflector,然后run获得Pods信息;
Reflector-获取Pods信息
client-go/tools/cache/reflector.go NewReflector()
1. Reflector对象,主要数据成员:ListerWatcher,ListerWatcher是接口对象,包括方法List()和Watch();这里,ListWatch对象将作为Reflector对象的ListerWatcher数据成员;
Reflector-执行ListAndWatch
client-go/tools/cache/reflector.go Run()
1. 启动协程执行执行reflector的ListAndWatch()方法;
2. ListAndWatch()方法
(1) ListAndWatch()方法在之前的文章中的kube-apiserver对etcd的watch机制已经介绍过。
(2) 调用ListFunc和WatchFunc去List和Watch apiserver的资源;
(3) 并调用reflector的watchHandler()方法;
listFunc/watchFunc
1. listFunc首先Get(),说明发起一个Get请求,再看看Namespace(namespace),其实只是在request设置namespace字段,再看看Resource函数,设置request的resource字段,VersionedParams主要对options进行序列化,options主要包括ResourceVersion和TimeoutSeconds这两个参数,FieldsSelectorParam函数主要将filter函数进行序列化,深入分析这个函数,你可以发现其实他将他序列化到一个嵌套的map里面。Do()函数发起真正的请求,并收到response,然后用r.transformResponse去处理response,包装成Result返回。Get()函数则主要对Result进行反序列化。最后返回结果。
2. watchFunc很大一部分与ListFunc重合了,比如Get()、Namespace(namespace)、Resource(resource)、VersionedParams(&options, api.ParameterCodec),有两个比较特殊的函数Prefix(“watch”)和Watch()。Prefix(“watch”)主要在pathPrefix的结尾增加了watch字段,用来和List请求区分。
3. Watch()方法将返回watch.Interface,这个watch.Interface专门用来传送kubelet想要watch的资源。Watch首先会发起一个request,然后反序列化response,从response中获得watch.Interface。
reflector.watchHandler
client-go/tools/cache/reflector.go watchHandler()
1. 从channel读取event,然后更新到r.store;
2. r.store是什么呢?创建reflector时,定义了cache的类型为UndeltaStore;
UndeltaStore
client-go/tools/cache/undelta_store.go Add()/Update()/Delete()/Replace()
1. 执行u.PushFunc(u.Store.List())操作,PushFunc函数就cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)里面的send函数,它将从apiserver获取的pods传送到updates channel中,kubelet从updates这个channel获取到Pod信息进行处理。
2. 这里无论是add、delete或者modify, u.PushFunc(u.Store.List())他会发送存储的所有pods。因为在这些操作之前,它都会先操作Store里面的pods对象,确保Store里面存储的是分配到该节点的Pod的最新信息。至于kubelet怎么处理获取的pods信息,这就不属于watch机制该讨论的范畴了。
相关阅读:
1. http://licyhust.com/