注册
cloudhub.Register()
edgecontroller.Register()
devicecontroller.Register()
EdgeController
EdgeController是负责监听api-server的请求的,应该是cloudcore的入口。
edgecontroller.Register()
Register首先通过InitConfigure()初始化了一些配置信息,然后调用beehive的Register,最后调用Start
edgecontroller.Start()
Start方法分别创建了一个UpstreamController和一个DownstreamController,然后分别调用各种的Start方法
DownstreamController
NewDownstreamController
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
lc := &manager.LocationCache{}
cli, err := utils.KubeClient()
if err != nil {
klog.Warningf("create kube client failed with error: %s", err)
return nil, err
}
var nodeName = ""
if config.Get().EdgeSiteEnabled {
if config.Get().KubeNodeName == "" {
return nil, fmt.Errorf("kubeEdge node name is not provided in edgesite controller configuration")
}
nodeName = config.Get().KubeNodeName
}
podManager, err := manager.NewPodManager(cli, v1.NamespaceAll, nodeName)
if err != nil {
klog.Warningf("create pod manager failed with error: %s", err)
return nil, err
}
configMapManager, err := manager.NewConfigMapManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("create configmap manager failed with error: %s", err)
return nil, err
}
secretManager, err := manager.NewSecretManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("create secret manager failed with error: %s", err)
return nil, err
}
nodesManager, err := manager.NewNodesManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create nodes manager failed with error: %s", err)
return nil, err
}
serviceManager, err := manager.NewServiceManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create service manager failed with error: %s", err)
return nil, err
}
endpointsManager, err := manager.NewEndpointsManager(cli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create endpoints manager failed with error: %s", err)
return nil, err
}
ml, err := messagelayer.NewMessageLayer()
if err != nil {
klog.Warningf("create message layer failed with error: %s", err)
return nil, err
}
dc := &DownstreamController{
kubeClient: cli,
podManager: podManager,
configmapManager: configMapManager,
secretManager: secretManager,
nodeManager: nodesManager,
serviceManager: serviceManager,
endpointsManager: endpointsManager,
messageLayer: ml,
lc: lc,
}
if err := dc.initLocating(); err != nil {
return nil, err
}
return dc, nil
}
LocationCache 是保存node, service,pod, configmap, secret等的一个字典;
KubeClient就是调用的k8S的go client;EdgeSiteEnabled默认是false,猜测这个配置是让cloud也作为edge节点来用的;
后面就是启动各种XXManager。
看一个PodManager
PodManager
NewPodManager
// NewPodManager create PodManager from config
func NewPodManager(kubeClient *kubernetes.Clientset, namespace, nodeName string) (*PodManager, error) {
var lw *cache.ListWatch
if "" == nodeName {
lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, fields.Everything())
} else {
selector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
}
realEvents := make(chan watch.Event, config.Get().PodEventBuffer)
mergedEvents := make(chan watch.Event, config.Get().PodEventBuffer)
rh := NewCommonResourceEventHandler(realEvents)
si := cache.NewSharedInformer(lw, &v1.Pod{}, 0)
si.AddEventHandler(rh)
pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}
stopNever := make(chan struct{})
go si.Run(stopNever)
go pm.merge()
return pm, nil
}
这里就是使用K8S go client的ListWatch接口,注意nodeName默认是传入的空字符串,所以这里list的是所有pod。
realEvents和mergedEvents是两个带缓存的通道。
type CommonResourceEventHandler struct {
events chan watch.Event
}
func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
eventObj, ok := obj.(runtime.Object)
if !ok {
klog.Warningf("unknown type: %T, ignore", obj)
return
}
c.events <- watch.Event{Type: t, Object: eventObj}
}
handler只有一个obj2Event方法,也就是把某个类型的event放到通道中。SharedInformer和AddEventHandler是go client的标准用法,就不仔细看了。
看一下merge方法
merge()
func (pm *PodManager) merge() {
for re := range pm.realEvents {
pod := re.Object.(*v1.Pod)
switch re.Type {
case watch.Added:
pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
if pod.DeletionTimestamp == nil {
pm.mergedEvents <- re
} else {
re.Type = watch.Modified
pm.mergedEvents <- re
}
case watch.Deleted:
pm.pods.Delete(pod.UID)
pm.mergedEvents <- re
case watch.Modified:
value, ok := pm.pods.Load(pod.UID)
pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
if ok {
cachedPod := value.(*CachePod)
if pm.isPodUpdated(cachedPod, pod) {
pm.mergedEvents <- re
}
} else {
pm.mergedEvents <- re
}
default:
klog.Warningf("event type: %s unsupported", re.Type)
}
}
}
merge方法遍历所有real event(即watch api server得到的event),然后根据event的类型:添加、删除、更改进行不同的操作。
对添加pod的,首先先保存到pm.pods中,然后把event放到mergedEvents这个通道中(对于pod有DeletionTimestamp,也就是对于被删除重建的pod,修改type为modified);对delete操作,就在pm.pods中删掉这个pod,然后把event放到pm.mergedEvents中;对于modified的操作,先要取出原来的pod,然后创建一个新的CachePod对象放到pm.pod中。然后比较pod是否被修改,如果修改了,则把event放到pm.mergedEvents 中,否则就跳过这个event。
Events()
Events返回的是pm.mergedEvents,也就是已经处理过的event通道
回到NewDownstreamController中,最后在调用initLocating
initLocating()
先要取出edge node上label的key,默认是node-role.kubernetes.io/edge,后续list的时候,要根据这个key进行查找
然后遍历node,放到locationCache的EdgeNodes中,然后查询出所有在Edge节点上的Pod、ConfigMap、Secret等信息保存到locationCache中。
Start()
// Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("start downstream controller")
// pod
go dc.syncPod()
// configmap
go dc.syncConfigMap()
// secret
go dc.syncSecret()
// nodes
go dc.syncEdgeNodes()
// service
go dc.syncService()
// endpoints
go dc.syncEndpoints()
return nil
}
DownstreamController的start方法就起了6个syncXXX的go routine。这里具体看一下syncPod
syncPod
func (dc *DownstreamController) syncPod() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Stop edgecontroller downstream syncPod loop")
return
case e := <-dc.podManager.Events():
pod, ok := e.Object.(*v1.Pod)
if !ok {
klog.Warningf("object type: %T unsupported", pod)
continue
}
if !dc.lc.IsEdgeNode(pod.Spec.NodeName) {
continue
}
msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name)
if err != nil {
klog.Warningf("built message resource failed with error: %s", err)
continue
}
msg.Content = pod
switch e.Type {
case watch.Added:
msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)
dc.lc.AddOrUpdatePod(*pod)
case watch.Deleted:
msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
case watch.Modified:
msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)
dc.lc.AddOrUpdatePod(*pod)
default:
klog.Warningf("pod event type: %s unsupported", e.Type)
}
if err := dc.messageLayer.Send(*msg); err != nil {
klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
} else {
klog.Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
}
}
}
}
这里首先取出之前merge()完的event,然后创建一个新的msg,接着调用BuildResource、BuildRouter来创建msg的内容,最后通过Send发送msg.
BuildResource
参数是nodeID, namespace, resourceType, resourceID。对于pod类型的资源来说,nodeid是通过pod.Spec的NodeName得到的,namespace是pod的namespace,resourceType就是pod,resourceid则是pod自己的名称。其他类型的资源类似。
最终形成的resource的格式为 node/[nodeID]/[namespace]/[resourceType]/[resourceID] 如:node/edge-node/default/pod/nginx-deployment-d86dfb797-jntl7
BuildRouter
//BuildRouter sets route and resource operation in message
func (msg *Message) BuildRouter(source, group, res, opr string) *Message {
msg.SetRoute(source, group)
msg.SetResourceOperation(res, opr)
return msg
}
//SetResourceOperation sets router resource and operation in message
func (msg *Message) SetResourceOperation(res, opr string) *Message {
msg.Router.Resource = res
msg.Router.Operation = opr
return msg
}
//SetRoute sets router source and group in message
func (msg *Message) SetRoute(source, group string) *Message {
msg.Router.Source = source
msg.Router.Group = group
return msg
}
BuildRouter是定义在beehive中的,第一个参数source代表来源,这里传入的是"edgecontroller";第二个参数group,这里传入的是“resource”(目前还没看到其他的group);第三个参数resource就是资源名称;最后一个参数opr是操作类型,比如Insert、Update、Delete等(所有操作定义在beehive\pkg\core\model\message.go中)
因此,可以看出BuildRouter就是在msg的Router字段填上信息,这个信息标识了这个msg要做什么操作、操作什么资源以及是谁发起的操作。
Send
messagelayer.Send调用了beehiveContext.Send(cml.SendModuleName, message),其中SendModuleName是配置文件中读取的,默认名称是“CloudHubControllerModuleName”
send的消息最终会被CloudHub取掉,cloudhub根据消息的Router,通过websocket发送到正确的edge节点上。
Cloudhub的详细处理流程见后续分析。
UpstreamController
upstreamController的处理流程和DownstreamController正好相反,这里就先不分析了。