KubeEdge分析-mapper与deviceTwin交互流程

整体架构

官方文档 https://github.com/kubeedge/kubeedge/blob/master/docs/mappers/modbus_mapper.md

kubeedge_arch.png

从架构图上可以看出,mapper和外界,包括DeviceTwin唯一的交互途径就是通过edge node内部的MQTT broker.

mapper通过订阅、发布相关的topic,实现与EdgeCore或者其他自己定义的APP进行交互

topic

先看下网关文档中的说明(https://docs.kubeedge.io/en/latest/guides/message_topics.html)

1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"

We will focus on the message expected on the first 3 topics.

"$hw/events/node/+/membership/get": This topics is used to get membership details of a node i.e the devices that are associated with the node. The response of the message is published on "$hw/events/node/+/membership/get/result" topic.
"$hw/events/device/+/state/update”: This topic is used to update the state of the device. + symbol can be replaced with ID of the device whose state is to be updated.
"$hw/events/device/+/twin/+": The two + symbols can be replaced by the deviceID on whose twin the operation is to be performed and any one of(update,cloud_updated,get) respectively.
Following is the explanation of the three suffix used:

update: this suffix is used to update the twin for the deviceID.
cloud_updated: this suffix is used to sync the twin status between edge and cloud.
get: is used to get twin status of a device. The response is published on "$hw/events/device/+/twin/get/result" topic.

另外,在eventbus的文档中也有一段描述

- $hw/events/upload/#
- SYS/dis/upload_records
- SYS/dis/upload_records/+
- $hw/event/node/+/membership/get
- $hw/event/node/+/membership/get/+
- $hw/events/device/+/state/update
- $hw/events/device/+/state/update/+
- $hw/event/device/+/twin/+

在看下代码,从modbus_mapper的constant.js中,可以看到mapper中的所有topic。

const defaultTopicPrefix = '$hw/events/device/';
const defaultDirectTopicPrefix = '$hw/devices/';
const twinDeltaTopic = defaultTopicPrefix + '+/twin/update/delta';
const twinUpdateTopic = '/twin/update';
const twinGetResTopic = defaultTopicPrefix + '+/twin/get/result';
const twinGetTopic = '/twin/get';
const directGetTopic = '/events/properties/get';

从代码中可以看出,有两类topic,一类是类似广播类型的hw/events/device/ ,另一类是针对某个特定设备的hw/devices/。

另外,文档和代码中的topic是有区别的,代码中并没有node、SYS相关的topic

topic详细分析

基本概念

mqtt的topic是支持通配符的(wildcard),有3类通配符,分别是

  • + 加号,匹配一段
  • # 井号,匹配多段
  • $ 美元符,用在首位,防止被+、#通配符匹配

mqtt client是可以订阅多个topic的(传入topic列表),不过modbus_mapper的实现中,并没有用这种方式,而是每个client只用来发布、订阅一个topic。

这种方式下,on_message就不需要在判断收到的msg是相应哪个topic的了,可以直接进入处理流程。

node/+/membership

在server.go中,有相关的订阅,但是在mapper中没看到有推送,这里后续再研究

// onSubscribe will be called if the topic is matched in topic tree.
func (m *Server) onSubscribe(msg *packet.Message) {
    // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
    // for other, send to hub
    // for "SYS/dis/upload_records", no need to base64 topic
    var target string
    resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic))
    if strings.HasPrefix(msg.Topic, "$hw/events/device") || strings.HasPrefix(msg.Topic, "$hw/events/node") {
        target = modules.TwinGroup
    } else {
        target = modules.HubGroup
        if msg.Topic == "SYS/dis/upload_records" {
            resource = "SYS/dis/upload_records"
        }
    }
    // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
    message := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
        resource, "response").FillBody(string(msg.Payload))
    klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, resource))
    beehiveContext.SendToGroup(target, *message)
}

twinGetResTopic

$hw/events/device/+/twin/get/result

Mapper订阅这个twinGetResTopic,注意中间的通配符加号,这里匹配的应该是任意设备ID

        mqtt_client.on('connect', ()=>{
            logger.info('connetced to edge mqtt with topic twinGet');
            mqtt_client.subscribe(constant.twinGetResTopic);
            for (let instance of devIns) {
                dt.getActuals(instance[0]);
            }
        });

对这个订阅,收到的消息处理代码如下

        mqtt_client.on('message', (topic, message)=>{
            try {
                var msgGet = JSON.parse(message.toString());
            } catch (err) {
                logger.error('unmarshal error');
                return;
            }
            let resources = topic.toString().split('/');
            let deviceID = resources[3];
            let dt = new DeviceTwin(mqtt_client);
            let devProtocol, devInstance;
            if (devPro.has(deviceID) && devIns.has(deviceID)) {
                devProtocol = devPro.get(deviceID);
                devInstance = devIns.get(deviceID);
            } else {
                logger.error('match visitor failed');
            }
            logger.info('recieve twinGet msg, set properties actual value map');
            if (resources.length === 7 && resources[5] === 'get' && msgGet != null && msgGet.code != 404 && typeof(devProtocol) != 'undefined' && typeof(devInstance) != 'undefined') {
                dt.setActuals(msgGet, (PropActuals)=>{
                    for (let actual of PropActuals) {
                        ActualVal.set(util.format('%s-%s', deviceID, actual[0]), actual[1]);
                    }
                });
                dt.setExpecteds(msgGet, (PropExpecteds)=>{
                    for (let expected of PropExpecteds) {
                        modbusProtocolTransfer(devProtocol.protocol, (transferedProtocol)=>{
                            if (modVistr.has(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol))) {
                                let visitor = modVistr.get(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol));
                                dealDeltaMsg(msgGet, expected[0], visitor, devProtocol, expected[1]);
                            }
                        });
                    }
                });
            }
        }

代码中首先根据topic的名称取出device id,然后根据本地缓存的device profile检查是否存在这个id,如果存在,那么取出改设备对应的协议、实例等信息,然后依次调用
setActuals和setExpecteds方法。

setActuals

取出topic消息体中个属性的actual的值,然后放到ActualVal这个全局的map中保存。

(从代码中看,setActuals并不会改变设备上的数据,只会记录在ActualVal。而ActualVal是会定期和设备上实际的数据进行同步的,并且以设备数据为准,所以这里setActual的目的不明)

setExpecteds

取出topic消息体中个属性的expected的值(不能同时有actual和expected,否则是不做处理的),然后调用协议驱动(比如modbus协议驱动),将值写入到设备中。

twinGetTopic

除了订阅和处理twinGetResTopic消息,index.js在初始化的时候,还干了一件事:遍历设备,调用deviceTwin的getActuals

    // getActuals publish get devicetwin msg to edge mqtt
    getActuals(deviceID) {
        let payload_msg = {
            event_id: "",
            timestamp: new Date().getTime()
        };
        this.mqttClient.publish(constant.defaultTopicPrefix + deviceID + constant.twinGetTopic, JSON.stringify(payload_msg));
    }

topic由具体的deviceID一起组成,'$hw/events/device/123456789/twin/get
device的列表是由configMap传入并维护的,mapper会为每个device都发布一个独立的twinGetTopic

上一节分析的twinGetResTopic,实际就是对twinGetTopic的response。deviceID是从configMap中取出来的,因此,mapper是根据configMap中配置的device instance列表,发布twinGetTopic,edgecore收到topic后,发布twinGetResTopic。mapper通过订阅twinGetResTopic就可以获得所有device instance从云端发来的数据。

twinDeltaTopic

$hw/events/device/+/twin/update/delta

twinDeltaTopic是在一个新的client中订阅的,和之前twinGetResTopic处理形式类似,,也是从topic中取出device的id,然后再缓存中找到该设备对应的profile信息

然后从topic中的msg中取出数据,遍历每个key(也就是property的名称)通过如下方法进行处理:

                                DeviceTwin.syncExpected(msg, key, (value)=>{
                                    dealDeltaMsg(msg, key, visitor, devProtocol, value);
                                });
    // syncExpected check whether expected value should be update to device
    static syncExpected(delta, key, callback) {
        let deviceTwin = delta.twin[key];
        if (!delta.twin.hasOwnProperty(key)) {
            logger.error("Invalid device twin ", key);
            return;
        }
        if (!deviceTwin.hasOwnProperty('actual') ||
          (deviceTwin.hasOwnProperty('expected') && deviceTwin.expected.hasOwnProperty('metadata') && deviceTwin.actual.hasOwnProperty('metadata') && 
            deviceTwin.expected.metadata.timestamp > deviceTwin.actual.metadata.timestamp &&
            deviceTwin.expected.value !== deviceTwin.actual.value)) {
          callback(deviceTwin.expected.value);
        }
    }

这里逻辑还比较复杂,总体上就是deviceTwin这个对象中需要有expected,但没有actual,并且expected的更新时间晚于actual,并且expected和actual值不等的时候,调用dealDeltaMsg对实际设备上的寄存器进行更新。

deviceProfile更新

index.js会监听deviceProfile更新,当cloud通过configmap更新deviceProfile时,会重新创建一个mqtt的client,来监听twinGetResTopic的topic。

对topic的处理和之前写的twinGetResTopic处理逻辑基本一致,只是少了setActuals的步骤,而只调用setExpecteds。

从这里也可以看出,setActuals只是mapper第一次启动的时候进行一下初始化。

syncDeviceTwin定时任务

syncDeviceTwin这个定时任务每两秒执行一次

定时任务中,遍历本地缓存总的所有设备,通过modbus读取设备上最新的数据,然后更新本地的ActualVals缓存,

twinUpdateTopic

然后调用updateActual方法,通过$hw/events/device/123456789/twin/update topic推送device twin的更新

directGetTopic

接着调用UpdateDirectActuals方法,往$hw/devices/123456789/events/properties/get 的topic中推送最新的设备属性数据

两者区别

从上述分析可以看出,定时任务会通过mqtt推送两类数据,看一下他们的区别。

  • directGetTopic只在每次定时任务结束的时候调用一次,而twinUpdateTopic是每个设备属性的值发生变化时,就调用一次。
  • directGetTopic每次更新这个设备的所有属性,twinUpdateTopic每次只更新一个设备的一个属性
  • directGetTopic有些额外的字段,比如route、content与header字段(具体用途见edge core的device twin模块分析)

Edge端

edged的eventbus会监控边缘MQTT的topic,根据topic名称和内容进行相应处理。

eventbus

首先确认一点,所有与MQTT broker的交互,都是通过eventbus来做的,eventbus从MQTT订阅消息以后,再通过beehive发送到其他模块,如devicetwin模块

// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, message MQTT.Message) {
    klog.Infof("OnSubMessageReceived receive msg from topic: %s", message.Topic())
    // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
    // for other, send to hub
    // for "SYS/dis/upload_records", no need to base64 topic
    var target string
    resource := base64.URLEncoding.EncodeToString([]byte(message.Topic()))
    if strings.HasPrefix(message.Topic(), "$hw/events/device") || strings.HasPrefix(message.Topic(), "$hw/events/node") {
        target = modules.TwinGroup
    } else {
        target = modules.HubGroup
        if message.Topic() == "SYS/dis/upload_records" {
            resource = "SYS/dis/upload_records"
        }
    }
    // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
    msg := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
        resource, "response").FillBody(string(message.Payload()))
    klog.Info(fmt.Sprintf("received msg from mqttserver, deliver to %s with resource %s", target, resource))
    beehiveContext.SendToGroup(target, *msg)
}
// BuildRouter sets route and resource operation in message
func (msg *Message) BuildRouter(source, group, res, opr string) *Message {
    msg.SetRoute(source, group)
    msg.SetResourceOperation(res, opr)
    return msg
}

  • resource是通过base64编码的完整的topic的名字

  • target对于"hw/events/device"和"hw/events/node"这两个类型的topic,target是 twinGroup,否则是HubGroup

  • router key是 routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>

  • 根据BuildRouter的参数可以看出,这里的source是bus,group是user

devicetwin

参考官方文档 https://github.com/kubeedge/kubeedge/blob/master/docs/modules/edge/devicetwin.md

deviceTwin.png

devicetwin从beehive中取到group为twin的消息,
然后调用classifyMsg来对消息进行分类。这里的分类方式,对devicetwin的处理流程有比较大的帮助,所以仔细看下

func classifyMsg(message *dttype.DTMessage) bool {
    if EventActionMap == nil {
        initEventActionMap()
    }
    var identity string
    var action string
    msgSource := message.Msg.GetSource()
    if strings.Compare(msgSource, "bus") == 0 {
        idLoc := 3
        topic := message.Msg.GetResource()
        topicByte, err := base64.URLEncoding.DecodeString(topic)
        if err != nil {
            return false
        }
        topic = string(topicByte)

        klog.Infof("classify the msg with the topic %s", topic)
        splitString := strings.Split(topic, "/")
        if len(splitString) == 4 {
            if strings.HasPrefix(topic, dtcommon.LifeCycleConnectETPrefix) {
                action = dtcommon.LifeCycle
            } else if strings.HasPrefix(topic, dtcommon.LifeCycleDisconnectETPrefix) {
                action = dtcommon.LifeCycle
            } else {
                return false
            }
        } else {
            identity = splitString[idLoc]
            loc := strings.Index(topic, identity)
            nextLoc := loc + len(identity)
            prefix := topic[0:loc]
            suffix := topic[nextLoc:]
            klog.Infof("%s %s", prefix, suffix)
            if v, exist := EventActionMap[prefix][suffix]; exist {
                action = v
            } else {
                return false
            }
        }
        message.Msg.Content = []byte((message.Msg.Content).(string))
        message.Identity = identity
        message.Action = action
        klog.Infof("Classify the msg to action %s", action)
        return true
    } else if (strings.Compare(msgSource, "edgemgr") == 0) || (strings.Compare(msgSource, "devicecontroller") == 0) {
        switch message.Msg.Content.(type) {
        case []byte:
            klog.Info("Message content type is []byte, no need to marshal again")
        default:
            content, err := json.Marshal(message.Msg.Content)
            if err != nil {
                return false
            }
            message.Msg.Content = content
        }
        if strings.Contains(message.Msg.Router.Resource, "membership/detail") {
            message.Action = dtcommon.MemDetailResult
            return true
        } else if strings.Contains(message.Msg.Router.Resource, "membership") {
            message.Action = dtcommon.MemUpdated
            return true
        } else if strings.Contains(message.Msg.Router.Resource, "twin/cloud_updated") {
            message.Action = dtcommon.TwinCloudSync
            resources := strings.Split(message.Msg.Router.Resource, "/")
            message.Identity = resources[1]
            return true
        } else if strings.Contains(message.Msg.Router.Operation, "updated") {
            resources := strings.Split(message.Msg.Router.Resource, "/")
            if len(resources) == 2 && strings.Compare(resources[0], "device") == 0 {
                message.Action = dtcommon.DeviceUpdated
                message.Identity = resources[1]
            }
            return true
        }
        return false

    } else if strings.Compare(msgSource, "edgehub") == 0 {
        if strings.Compare(message.Msg.Router.Resource, "node/connection") == 0 {
            message.Action = dtcommon.LifeCycle
            return true
        }
        return false
    }
    return false
}

classifyMsg中首先判断的是msg的source,从代码中看,分别有bus、edgemgr、devicecontroller、edgehub四种。

从eventbus中过来的消息,就是bus;从edgehub过来的消息目前看就只有心跳;edgemgr的source从代码中没看到,目前应该是用不到;devicecontroller过来的消息应该是云端对device的操作;

这里在分析mapper与devicetwin的交互,所以先看bus的类型的消息。

对于"$hw/events/connected/%s"类型的消息,是心跳类型,返回false,也就是不需要处理

对于其他消息,也就是"$hw/events/device/123456789/twin/update"的消息,identity就是取的第四个字段,也就是device的ID

action则是根据前缀$hw/events/device/和后缀twin/update这两部分查表(EventActionMap)查出来的,

通过这两部分进行查表,最终进入到处理方法dealTwinUpdate方法中

dealTwinUpdate.png

遗留问题

twinDeltaTopic和twinGetResTopic区别

twinDeltaTopic和twinGetResTopic都可以更新设备上的值,两者的区别在哪里?

  • BuildDeviceTwinResult
  • BuildDeviceTwinDelta

directGetTopic没有接受方

directGetTopic从代码中看,是没有接收方到,也就是说这个消息不会被处理

mapper更新了所有属性

从modbus mapper中可以看到msg twin的一个结构,即一个key是Property名称的字典

        let reply_msg = {
            event_id: "",
            timestamp: new Date().getTime()
        };
        let twin = {};
        twin[property.name] = {
            actual: {
                value: String(value),
                metadata: {
                    timestamp: new Date().getTime()
                }
            },
            metadata: {
                tyep: property.dataType
            }
        };
        reply_msg.twin = twin;

所以,最终eventbus就是从modbus mapper中取出msg twin来进行后续处理。

但这里其实是有个问题的,modbus mapper的当前实现上,没有判断一个属性是否是twin属性,
直接将所有属性都做为twin属性给publish了,这个问题在后续应该会进行修复。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容