概述
devicecontroller是管理边缘网关所连接的设备的,在K8S controller中注册了devicemodel和device两种CRD。
Register
初始化配置,并在beehive中注册。
Start
分别创建DownstreamController和UpstreamController并启动
DownstreamController
NewDownstreamController
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController() (*DownstreamController, error) {
cli, err := utils.KubeClient()
if err != nil {
klog.Warningf("Create kube client failed with error: %s", err)
return nil, err
}
config, err := utils.KubeConfig()
if err != nil {
klog.Warningf("Get kubeConfig error: %v", err)
return nil, err
}
crdcli, err := utils.NewCRDClient(config)
deviceManager, err := manager.NewDeviceManager(crdcli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}
deviceModelManager, err := manager.NewDeviceModelManager(crdcli, v1.NamespaceAll)
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}
cm := manager.NewConfigMapManager()
ml, err := messagelayer.NewMessageLayer()
if err != nil {
klog.Warningf("Create message layer failed with error: %s", err)
return nil, err
}
dc := &DownstreamController{
kubeClient: cli,
deviceManager: deviceManager,
deviceModelManager: deviceModelManager,
messageLayer: ml,
configMapManager: cm,
}
return dc, nil
}
初始化了KubeClient、CRDClient、DeviceManager、DeviceModelManager、ConfigMapManager以供后续使用。
XXManager中都有一个map,用来记录k8s中的对象的状态,以及一个ListWatchClient,用来向API server查询状态,查询到的状态会放到events这个通道中。
Start
// Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("Start downstream devicecontroller")
go dc.syncDeviceModel()
// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
go dc.syncDevice()
return nil
}
start里就起了2个go routine,一个是用来同步DeviceModel,另外一个是用来同步Device的
syncDeviceModel
// syncDeviceModel is used to get events from informer
func (dc *DownstreamController) syncDeviceModel() {
for {
select {
case <-beehiveContext.Done():
klog.Info("stop syncDeviceModel")
return
case e := <-dc.deviceModelManager.Events():
deviceModel, ok := e.Object.(*v1alpha1.DeviceModel)
if !ok {
klog.Warningf("object type: %T unsupported", deviceModel)
continue
}
switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
}
}
}
从deviceModelManager获取deviceModel相关的事件,根据事件类型触发设备模型的添加、删除、更新等操作(与edgemanager有个区别,这边的event是没有经过merge的)
deviceModelAdded
添加devicemodel只会在本地缓存中留下一个记录,不会下发到网关设备上
deviceModelDeleted
删除devicemodel目前只会在本地缓存中删掉这个记录,TODO中说明应该要删除这个模型关联的所有设备
deviceModelUpdated
// deviceModelUpdated is function to process updated deviceModel
func (dc *DownstreamController) deviceModelUpdated(deviceModel *v1alpha1.DeviceModel) {
value, ok := dc.deviceModelManager.DeviceModel.Load(deviceModel.Name)
dc.deviceModelManager.DeviceModel.Store(deviceModel.Name, deviceModel)
if ok {
cachedDeviceModel := value.(*v1alpha1.DeviceModel)
if isDeviceModelUpdated(cachedDeviceModel, deviceModel) {
dc.updateAllConfigMaps(deviceModel)
}
} else {
dc.deviceModelAdded(deviceModel)
}
}
这个除了更新缓存,剩下的动作就是updateAllConfigMaps
updateAllConfigMaps
TODO: add logic to update all config maps, How to manage if a property is deleted but a device is referring that property. Need to come up with a design.
目前只有一个说明,还没实现。
syncDevice
与syncDeviceModel类似,从deviceManager获取device相关的事件,根据事件类型触发设备模型的添加、删除、更新等操作
deviceAdded
// deviceAdded creates a device, adds in deviceManagers map, send a message to edge node if node selector is present.
func (dc *DownstreamController) deviceAdded(device *v1alpha1.Device) {
dc.deviceManager.Device.Store(device.Name, device)
if len(device.Spec.NodeSelector.NodeSelectorTerms) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions) != 0 && len(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values) != 0 {
dc.addToConfigMap(device)
edgeDevice := createDevice(device)
msg := model.NewMessage("")
resource, err := messagelayer.BuildResource(device.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions[0].Values[0], "membership", "")
if err != nil {
klog.Warningf("Built message resource failed with error: %s", err)
return
}
msg.BuildRouter(constants.DeviceControllerModuleName, constants.GroupTwin, resource, model.UpdateOperation)
content := types.MembershipUpdate{AddDevices: []types.Device{
edgeDevice,
}}
content.EventID = uuid.NewV4().String()
content.Timestamp = time.Now().UnixNano() / 1e6
msg.Content = content
err = dc.messageLayer.Send(*msg)
if err != nil {
klog.Errorf("Failed to send device addition message %v due to error %v", msg, err)
}
}
}
deviceAdd相比deviceModelAdd就要复杂很多,这边仔细分析一下
首先是在manager的缓存中保存一份,这个和devcieModelAdd一致。
接着调用addToConfigMap。在mapper控制分析中提过,所有mapper的配置都是通过configMap下发的,因此,这里在云端添加一个device,就需要更新或创建对应节点的configMap(这个方法中没有更新边缘节点,只更新了api-server, 由edgecontroller watch到configmap的更新以后,会再同步到边缘节点)。addToConfigMap又调用了addDeviceInstanceAndProtocol和addDeviceModelAndVisitors来生成DeviceModel、Visitors、DeviceInstanc、Protocol配置(从代码中看出,Protocol是硬编码进去的,目前仅支持OPC-UA、Modbus-TCP/RTU和蓝牙。
然后是createDevice,用设备名称创建了一个edgeDevice对象(和api-server中的device对象不同),设备名称需要唯一。然后是为每个属性设置twin对象,key是属性名称,value包含Expected、Optional、Metadata等字段。
再然后是BuildResource和BuildRouter,BuildResource传入的参数和之前edgecontroller中的区别较大,在namespace的位置传入了"membership",资源类型和资源ID传入了空值。
BuildRouter没有啥特别的地方。
接下来就是把edgeDevice放到msg的Content字段中,并添加了时间戳和eventID ,最后调用Send方法。(和edgecontroller不同的是,edgecontroller sync的消息中,content是Pod、ConfigMap等对象,这里是个device的list)
UpstreamController
upstreamController的流程和DownStreamController正好相反,这里就不详细分析了。