KubeEdge分析-cloudcore-edgecontroller

注册

cloudhub.Register()
edgecontroller.Register()
devicecontroller.Register()

EdgeController

EdgeController是负责监听api-server的请求的,应该是cloudcore的入口。

edgecontroller.Register()

Register首先通过InitConfigure()初始化了一些配置信息,然后调用beehive的Register,最后调用Start

edgecontroller.Start()

Start方法分别创建了一个UpstreamController和一个DownstreamController,然后分别调用各种的Start方法

DownstreamController

NewDownstreamController

// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
    lc := &manager.LocationCache{}

    cli, err := utils.KubeClient()
    if err != nil {
        klog.Warningf("create kube client failed with error: %s", err)
        return nil, err
    }

    var nodeName = ""
    if config.Get().EdgeSiteEnabled {
        if config.Get().KubeNodeName == "" {
            return nil, fmt.Errorf("kubeEdge node name is not provided in edgesite controller configuration")
        }
        nodeName = config.Get().KubeNodeName
    }

    podManager, err := manager.NewPodManager(cli, v1.NamespaceAll, nodeName)
    if err != nil {
        klog.Warningf("create pod manager failed with error: %s", err)
        return nil, err
    }

    configMapManager, err := manager.NewConfigMapManager(cli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("create configmap manager failed with error: %s", err)
        return nil, err
    }

    secretManager, err := manager.NewSecretManager(cli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("create secret manager failed with error: %s", err)
        return nil, err
    }

    nodesManager, err := manager.NewNodesManager(cli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("Create nodes manager failed with error: %s", err)
        return nil, err
    }

    serviceManager, err := manager.NewServiceManager(cli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("Create service manager failed with error: %s", err)
        return nil, err
    }

    endpointsManager, err := manager.NewEndpointsManager(cli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("Create endpoints manager failed with error: %s", err)
        return nil, err
    }

    ml, err := messagelayer.NewMessageLayer()
    if err != nil {
        klog.Warningf("create message layer failed with error: %s", err)
        return nil, err
    }

    dc := &DownstreamController{
        kubeClient:       cli,
        podManager:       podManager,
        configmapManager: configMapManager,
        secretManager:    secretManager,
        nodeManager:      nodesManager,
        serviceManager:   serviceManager,
        endpointsManager: endpointsManager,
        messageLayer:     ml,
        lc:               lc,
    }
    if err := dc.initLocating(); err != nil {
        return nil, err
    }

    return dc, nil
}

LocationCache 是保存node, service,pod, configmap, secret等的一个字典;

KubeClient就是调用的k8S的go client;EdgeSiteEnabled默认是false,猜测这个配置是让cloud也作为edge节点来用的;
后面就是启动各种XXManager。

看一个PodManager

PodManager

NewPodManager

// NewPodManager create PodManager from config
func NewPodManager(kubeClient *kubernetes.Clientset, namespace, nodeName string) (*PodManager, error) {
    var lw *cache.ListWatch
    if "" == nodeName {
        lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, fields.Everything())
    } else {
        selector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
        lw = cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
    }
    realEvents := make(chan watch.Event, config.Get().PodEventBuffer)
    mergedEvents := make(chan watch.Event, config.Get().PodEventBuffer)
    rh := NewCommonResourceEventHandler(realEvents)
    si := cache.NewSharedInformer(lw, &v1.Pod{}, 0)
    si.AddEventHandler(rh)

    pm := &PodManager{realEvents: realEvents, mergedEvents: mergedEvents}

    stopNever := make(chan struct{})
    go si.Run(stopNever)
    go pm.merge()

    return pm, nil
}

这里就是使用K8S go client的ListWatch接口,注意nodeName默认是传入的空字符串,所以这里list的是所有pod。

realEvents和mergedEvents是两个带缓存的通道。

type CommonResourceEventHandler struct {
    events chan watch.Event
}

func (c *CommonResourceEventHandler) obj2Event(t watch.EventType, obj interface{}) {
    eventObj, ok := obj.(runtime.Object)
    if !ok {
        klog.Warningf("unknown type: %T, ignore", obj)
        return
    }
    c.events <- watch.Event{Type: t, Object: eventObj}
}

handler只有一个obj2Event方法,也就是把某个类型的event放到通道中。SharedInformer和AddEventHandler是go client的标准用法,就不仔细看了。

看一下merge方法

merge()

func (pm *PodManager) merge() {
    for re := range pm.realEvents {
        pod := re.Object.(*v1.Pod)
        switch re.Type {
        case watch.Added:
            pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
            if pod.DeletionTimestamp == nil {
                pm.mergedEvents <- re
            } else {
                re.Type = watch.Modified
                pm.mergedEvents <- re
            }
        case watch.Deleted:
            pm.pods.Delete(pod.UID)
            pm.mergedEvents <- re
        case watch.Modified:
            value, ok := pm.pods.Load(pod.UID)
            pm.pods.Store(pod.UID, &CachePod{ObjectMeta: pod.ObjectMeta, Spec: pod.Spec})
            if ok {
                cachedPod := value.(*CachePod)
                if pm.isPodUpdated(cachedPod, pod) {
                    pm.mergedEvents <- re
                }
            } else {
                pm.mergedEvents <- re
            }
        default:
            klog.Warningf("event type: %s unsupported", re.Type)
        }
    }
}

merge方法遍历所有real event(即watch api server得到的event),然后根据event的类型:添加、删除、更改进行不同的操作。
对添加pod的,首先先保存到pm.pods中,然后把event放到mergedEvents这个通道中(对于pod有DeletionTimestamp,也就是对于被删除重建的pod,修改type为modified);对delete操作,就在pm.pods中删掉这个pod,然后把event放到pm.mergedEvents中;对于modified的操作,先要取出原来的pod,然后创建一个新的CachePod对象放到pm.pod中。然后比较pod是否被修改,如果修改了,则把event放到pm.mergedEvents 中,否则就跳过这个event。

Events()

Events返回的是pm.mergedEvents,也就是已经处理过的event通道

回到NewDownstreamController中,最后在调用initLocating

initLocating()

先要取出edge node上label的key,默认是node-role.kubernetes.io/edge,后续list的时候,要根据这个key进行查找
然后遍历node,放到locationCache的EdgeNodes中,然后查询出所有在Edge节点上的Pod、ConfigMap、Secret等信息保存到locationCache中。

Start()

// Start DownstreamController
func (dc *DownstreamController) Start() error {
    klog.Info("start downstream controller")
    // pod
    go dc.syncPod()

    // configmap
    go dc.syncConfigMap()

    // secret
    go dc.syncSecret()

    // nodes
    go dc.syncEdgeNodes()

    // service
    go dc.syncService()

    // endpoints
    go dc.syncEndpoints()

    return nil
}

DownstreamController的start方法就起了6个syncXXX的go routine。这里具体看一下syncPod

syncPod

func (dc *DownstreamController) syncPod() {
    for {
        select {
        case <-beehiveContext.Done():
            klog.Warning("Stop edgecontroller downstream syncPod loop")
            return
        case e := <-dc.podManager.Events():
            pod, ok := e.Object.(*v1.Pod)
            if !ok {
                klog.Warningf("object type: %T unsupported", pod)
                continue
            }
            if !dc.lc.IsEdgeNode(pod.Spec.NodeName) {
                continue
            }
            msg := model.NewMessage("")
            resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name)
            if err != nil {
                klog.Warningf("built message resource failed with error: %s", err)
                continue
            }
            msg.Content = pod
            switch e.Type {
            case watch.Added:
                msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.InsertOperation)
                dc.lc.AddOrUpdatePod(*pod)
            case watch.Deleted:
                msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.DeleteOperation)
            case watch.Modified:
                msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, model.UpdateOperation)
                dc.lc.AddOrUpdatePod(*pod)
            default:
                klog.Warningf("pod event type: %s unsupported", e.Type)
            }
            if err := dc.messageLayer.Send(*msg); err != nil {
                klog.Warningf("send message failed with error: %s, operation: %s, resource: %s", err, msg.GetOperation(), msg.GetResource())
            } else {
                klog.Infof("send message successfully, operation: %s, resource: %s", msg.GetOperation(), msg.GetResource())
            }
        }
    }
}

这里首先取出之前merge()完的event,然后创建一个新的msg,接着调用BuildResource、BuildRouter来创建msg的内容,最后通过Send发送msg.

BuildResource

参数是nodeID, namespace, resourceType, resourceID。对于pod类型的资源来说,nodeid是通过pod.Spec的NodeName得到的,namespace是pod的namespace,resourceType就是pod,resourceid则是pod自己的名称。其他类型的资源类似。

最终形成的resource的格式为 node/[nodeID]/[namespace]/[resourceType]/[resourceID] 如:node/edge-node/default/pod/nginx-deployment-d86dfb797-jntl7

BuildRouter

//BuildRouter sets route and resource operation in message
func (msg *Message) BuildRouter(source, group, res, opr string) *Message {
    msg.SetRoute(source, group)
    msg.SetResourceOperation(res, opr)
    return msg
}

//SetResourceOperation sets router resource and operation in message
func (msg *Message) SetResourceOperation(res, opr string) *Message {
    msg.Router.Resource = res
    msg.Router.Operation = opr
    return msg
}

//SetRoute sets router source and group in message
func (msg *Message) SetRoute(source, group string) *Message {
    msg.Router.Source = source
    msg.Router.Group = group
    return msg
}

BuildRouter是定义在beehive中的,第一个参数source代表来源,这里传入的是"edgecontroller";第二个参数group,这里传入的是“resource”(目前还没看到其他的group);第三个参数resource就是资源名称;最后一个参数opr是操作类型,比如Insert、Update、Delete等(所有操作定义在beehive\pkg\core\model\message.go中)

因此,可以看出BuildRouter就是在msg的Router字段填上信息,这个信息标识了这个msg要做什么操作、操作什么资源以及是谁发起的操作。

Send

messagelayer.Send调用了beehiveContext.Send(cml.SendModuleName, message),其中SendModuleName是配置文件中读取的,默认名称是“CloudHubControllerModuleName”

send的消息最终会被CloudHub取掉,cloudhub根据消息的Router,通过websocket发送到正确的edge节点上。

Cloudhub的详细处理流程见后续分析。

UpstreamController

upstreamController的处理流程和DownstreamController正好相反,这里就先不分析了。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容