KubeEdge分析-cloudcore-devicecontroller

概述

devicecontroller是管理边缘网关所连接的设备的,在K8S controller中注册了devicemodel和device两种CRD。

Register

初始化配置,并在beehive中注册。

Start

分别创建DownstreamController和UpstreamController并启动

DownstreamController

NewDownstreamController

// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
    cli, err := utils.KubeClient()
    if err != nil {
        klog.Warningf("Create kube client failed with error: %s", err)
        return nil, err
    }

    config, err := utils.KubeConfig()
    if err != nil {
        klog.Warningf("Get kubeConfig error: %v", err)
        return nil, err
    }

    crdcli, err := utils.NewCRDClient(config)
    deviceManager, err := manager.NewDeviceManager(crdcli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("Create device manager failed with error: %s", err)
        return nil, err
    }

    deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll)
    if err != nil {
        klog.Warningf("Create device manager failed with error: %s", err)
        return nil, err
    }

    cm := manager.NewConfigMapManager()

    ml, err := messagelayer.NewMessageLayer()
    if err != nil {
        klog.Warningf("Create message layer failed with error: %s", err)
        return nil, err
    }

    dc := &DownstreamController{
        kubeClient:         cli,
        deviceManager:      deviceManager,
        deviceModelManager: deviceModelManager,
        messageLayer:       ml,
        configMapManager:   cm,
    }
    return dc, nil
}

初始化了KubeClient、CRDClient、DeviceManager、DeviceModelManager、ConfigMapManager以供后续使用。

XXManager中都有一个map,用来记录k8s中的对象的状态,以及一个ListWatchClient,用来向API server查询状态,查询到的状态会放到events这个通道中。

Start

// Start DownstreamController
func (dc *DownstreamController) Start() error {
    klog.Info("Start downstream devicecontroller")

    go dc.syncDeviceModel()

    // Wait for adding all device model
    // TODO need to think about sync
    time.Sleep(1 * time.Second)
    go dc.syncDevice()

    return nil
}

start里就起了2个go routine,一个是用来同步DeviceModel,另外一个是用来同步Device的

syncDeviceModel

// syncDeviceModel is used to get events from informer
func (dc *DownstreamController) syncDeviceModel() {
    for {
        select {
        case <-beehiveContext.Done():
            klog.Info("stop syncDeviceModel")
            return
        case e := <-dc.deviceModelManager.Events():
            deviceModel, ok := e.Object.(*v1alpha1.DeviceModel)
            if !ok {
                klog.Warningf("object type: %T unsupported", deviceModel)
                continue
            }
            switch e.Type {
            case watch.Added:
                dc.deviceModelAdded(deviceModel)
            case watch.Deleted:
                dc.deviceModelDeleted(deviceModel)
            case watch.Modified:
                dc.deviceModelUpdated(deviceModel)
            default:
                klog.Warningf("deviceModel event type: %s unsupported", e.Type)
            }
        }
    }
}

从deviceModelManager获取deviceModel相关的事件,根据事件类型触发设备模型的添加、删除、更新等操作(与edgemanager有个区别,这边的event是没有经过merge的)

deviceModelAdded

添加devicemodel只会在本地缓存中留下一个记录,不会下发到网关设备上

deviceModelDeleted

删除devicemodel目前只会在本地缓存中删掉这个记录,TODO中说明应该要删除这个模型关联的所有设备

deviceModelUpdated

// deviceModelUpdated is function to process updated deviceModel
func (dc *DownstreamController) deviceModelUpdated(deviceModel *v1alpha1.DeviceModel) {
    value, ok := dc.deviceModelManager.DeviceModel.Load(deviceModel.Name)
    dc.deviceModelManager.DeviceModel.Store(deviceModel.Name, deviceModel)
    if ok {
        cachedDeviceModel := value.(*v1alpha1.DeviceModel)
        if isDeviceModelUpdated(cachedDeviceModel, deviceModel) {
            dc.updateAllConfigMaps(deviceModel)
        }
    } else {
        dc.deviceModelAdded(deviceModel)
    }
}

这个除了更新缓存,剩下的动作就是updateAllConfigMaps

updateAllConfigMaps

TODO: add logic to update all config maps, How to manage if a property is deleted but a device is referring that property. Need to come up with a design.

目前只有一个说明,还没实现。

syncDevice

与syncDeviceModel类似,从deviceManager获取device相关的事件,根据事件类型触发设备模型的添加、删除、更新等操作

deviceAdded

// deviceAdded creates a device, adds in deviceManagers map, send a message to edge node if node selector is present.
func (dc *DownstreamController) deviceAdded(device *v1alpha1.Device) {
    dc.deviceManager.Device.Store(device.Name, device)
    if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 {
        dc.addToConfigMap(device)
        edgeDevice := createDevice(device)
        msg := model.NewMessage("")

        resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "")
        if err != nil {
            klog.Warningf("Built message resource failed with error: %s", err)
            return
        }
        msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)

        content := types.MembershipUpdate{AddDevices: []types.Device{
            edgeDevice,
        }}
        content.EventID = uuid.NewV4().String()
        content.Timestamp = time.Now().UnixNano() / 1e6
        msg.Content = content

        err = dc.messageLayer.Send(*msg)
        if err != nil {
            klog.Errorf("Failed to send device addition message %v due to error %v", msg, err)
        }
    }
}

deviceAdd相比deviceModelAdd就要复杂很多,这边仔细分析一下

syncDevice.png

首先是在manager的缓存中保存一份,这个和devcieModelAdd一致。

接着调用addToConfigMap。在mapper控制分析中提过,所有mapper的配置都是通过configMap下发的,因此,这里在云端添加一个device,就需要更新或创建对应节点的configMap(这个方法中没有更新边缘节点,只更新了api-server, 由edgecontroller watch到configmap的更新以后,会再同步到边缘节点)。addToConfigMap又调用了addDeviceInstanceAndProtocol和addDeviceModelAndVisitors来生成DeviceModel、Visitors、DeviceInstanc、Protocol配置(从代码中看出,Protocol是硬编码进去的,目前仅支持OPC-UA、Modbus-TCP/RTU和蓝牙。

然后是createDevice,用设备名称创建了一个edgeDevice对象(和api-server中的device对象不同),设备名称需要唯一。然后是为每个属性设置twin对象,key是属性名称,value包含Expected、Optional、Metadata等字段。

再然后是BuildResource和BuildRouter,BuildResource传入的参数和之前edgecontroller中的区别较大,在namespace的位置传入了"membership",资源类型和资源ID传入了空值。
BuildRouter没有啥特别的地方。

接下来就是把edgeDevice放到msg的Content字段中,并添加了时间戳和eventID ,最后调用Send方法。(和edgecontroller不同的是,edgecontroller sync的消息中,content是Pod、ConfigMap等对象,这里是个device的list)

UpstreamController

upstreamController的流程和DownStreamController正好相反,这里就不详细分析了。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。