整体架构
官方文档 https://github.com/kubeedge/kubeedge/blob/master/docs/mappers/modbus_mapper.md

从架构图上可以看出,mapper和外界,包括DeviceTwin唯一的交互途径就是通过edge node内部的MQTT broker.
mapper通过订阅、发布相关的topic,实现与EdgeCore或者其他自己定义的APP进行交互
topic
先看下网关文档中的说明(https://docs.kubeedge.io/en/latest/guides/message_topics.html)
1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"
We will focus on the message expected on the first 3 topics.
"$hw/events/node/+/membership/get": This topics is used to get membership details of a node i.e the devices that are associated with the node. The response of the message is published on "$hw/events/node/+/membership/get/result" topic.
"$hw/events/device/+/state/update”: This topic is used to update the state of the device. + symbol can be replaced with ID of the device whose state is to be updated.
"$hw/events/device/+/twin/+": The two + symbols can be replaced by the deviceID on whose twin the operation is to be performed and any one of(update,cloud_updated,get) respectively.
Following is the explanation of the three suffix used:
update: this suffix is used to update the twin for the deviceID.
cloud_updated: this suffix is used to sync the twin status between edge and cloud.
get: is used to get twin status of a device. The response is published on "$hw/events/device/+/twin/get/result" topic.
另外,在eventbus的文档中也有一段描述
- $hw/events/upload/#
- SYS/dis/upload_records
- SYS/dis/upload_records/+
- $hw/event/node/+/membership/get
- $hw/event/node/+/membership/get/+
- $hw/events/device/+/state/update
- $hw/events/device/+/state/update/+
- $hw/event/device/+/twin/+
在看下代码,从modbus_mapper的constant.js中,可以看到mapper中的所有topic。
const defaultTopicPrefix = '$hw/events/device/';
const defaultDirectTopicPrefix = '$hw/devices/';
const twinDeltaTopic = defaultTopicPrefix + '+/twin/update/delta';
const twinUpdateTopic = '/twin/update';
const twinGetResTopic = defaultTopicPrefix + '+/twin/get/result';
const twinGetTopic = '/twin/get';
const directGetTopic = '/events/properties/get';
从代码中可以看出,有两类topic,一类是类似广播类型的hw/devices/。
另外,文档和代码中的topic是有区别的,代码中并没有node、SYS相关的topic
topic详细分析
基本概念
mqtt的topic是支持通配符的(wildcard),有3类通配符,分别是
- + 加号,匹配一段
- # 井号,匹配多段
- $ 美元符,用在首位,防止被+、#通配符匹配
mqtt client是可以订阅多个topic的(传入topic列表),不过modbus_mapper的实现中,并没有用这种方式,而是每个client只用来发布、订阅一个topic。
这种方式下,on_message就不需要在判断收到的msg是相应哪个topic的了,可以直接进入处理流程。
node/+/membership
在server.go中,有相关的订阅,但是在mapper中没看到有推送,这里后续再研究
// onSubscribe will be called if the topic is matched in topic tree.
func (m *Server) onSubscribe(msg *packet.Message) {
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for "SYS/dis/upload_records", no need to base64 topic
var target string
resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic))
if strings.HasPrefix(msg.Topic, "$hw/events/device") || strings.HasPrefix(msg.Topic, "$hw/events/node") {
target = modules.TwinGroup
} else {
target = modules.HubGroup
if msg.Topic == "SYS/dis/upload_records" {
resource = "SYS/dis/upload_records"
}
}
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
message := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
resource, "response").FillBody(string(msg.Payload))
klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, resource))
beehiveContext.SendToGroup(target, *message)
}
twinGetResTopic
$hw/events/device/+/twin/get/result
Mapper订阅这个twinGetResTopic,注意中间的通配符加号,这里匹配的应该是任意设备ID
mqtt_client.on('connect', ()=>{
logger.info('connetced to edge mqtt with topic twinGet');
mqtt_client.subscribe(constant.twinGetResTopic);
for (let instance of devIns) {
dt.getActuals(instance[0]);
}
});
对这个订阅,收到的消息处理代码如下
mqtt_client.on('message', (topic, message)=>{
try {
var msgGet = JSON.parse(message.toString());
} catch (err) {
logger.error('unmarshal error');
return;
}
let resources = topic.toString().split('/');
let deviceID = resources[3];
let dt = new DeviceTwin(mqtt_client);
let devProtocol, devInstance;
if (devPro.has(deviceID) && devIns.has(deviceID)) {
devProtocol = devPro.get(deviceID);
devInstance = devIns.get(deviceID);
} else {
logger.error('match visitor failed');
}
logger.info('recieve twinGet msg, set properties actual value map');
if (resources.length === 7 && resources[5] === 'get' && msgGet != null && msgGet.code != 404 && typeof(devProtocol) != 'undefined' && typeof(devInstance) != 'undefined') {
dt.setActuals(msgGet, (PropActuals)=>{
for (let actual of PropActuals) {
ActualVal.set(util.format('%s-%s', deviceID, actual[0]), actual[1]);
}
});
dt.setExpecteds(msgGet, (PropExpecteds)=>{
for (let expected of PropExpecteds) {
modbusProtocolTransfer(devProtocol.protocol, (transferedProtocol)=>{
if (modVistr.has(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol))) {
let visitor = modVistr.get(util.format('%s-%s-%s', devInstance.model, expected[0], transferedProtocol));
dealDeltaMsg(msgGet, expected[0], visitor, devProtocol, expected[1]);
}
});
}
});
}
}
代码中首先根据topic的名称取出device id,然后根据本地缓存的device profile检查是否存在这个id,如果存在,那么取出改设备对应的协议、实例等信息,然后依次调用
setActuals和setExpecteds方法。
setActuals
取出topic消息体中个属性的actual的值,然后放到ActualVal这个全局的map中保存。
(从代码中看,setActuals并不会改变设备上的数据,只会记录在ActualVal。而ActualVal是会定期和设备上实际的数据进行同步的,并且以设备数据为准,所以这里setActual的目的不明)
setExpecteds
取出topic消息体中个属性的expected的值(不能同时有actual和expected,否则是不做处理的),然后调用协议驱动(比如modbus协议驱动),将值写入到设备中。
twinGetTopic
除了订阅和处理twinGetResTopic消息,index.js在初始化的时候,还干了一件事:遍历设备,调用deviceTwin的getActuals
// getActuals publish get devicetwin msg to edge mqtt
getActuals(deviceID) {
let payload_msg = {
event_id: "",
timestamp: new Date().getTime()
};
this.mqttClient.publish(constant.defaultTopicPrefix + deviceID + constant.twinGetTopic, JSON.stringify(payload_msg));
}
topic由具体的deviceID一起组成,'$hw/events/device/123456789/twin/get
device的列表是由configMap传入并维护的,mapper会为每个device都发布一个独立的twinGetTopic
上一节分析的twinGetResTopic,实际就是对twinGetTopic的response。deviceID是从configMap中取出来的,因此,mapper是根据configMap中配置的device instance列表,发布twinGetTopic,edgecore收到topic后,发布twinGetResTopic。mapper通过订阅twinGetResTopic就可以获得所有device instance从云端发来的数据。
twinDeltaTopic
$hw/events/device/+/twin/update/delta
twinDeltaTopic是在一个新的client中订阅的,和之前twinGetResTopic处理形式类似,,也是从topic中取出device的id,然后再缓存中找到该设备对应的profile信息
然后从topic中的msg中取出数据,遍历每个key(也就是property的名称)通过如下方法进行处理:
DeviceTwin.syncExpected(msg, key, (value)=>{
dealDeltaMsg(msg, key, visitor, devProtocol, value);
});
// syncExpected check whether expected value should be update to device
static syncExpected(delta, key, callback) {
let deviceTwin = delta.twin[key];
if (!delta.twin.hasOwnProperty(key)) {
logger.error("Invalid device twin ", key);
return;
}
if (!deviceTwin.hasOwnProperty('actual') ||
(deviceTwin.hasOwnProperty('expected') && deviceTwin.expected.hasOwnProperty('metadata') && deviceTwin.actual.hasOwnProperty('metadata') &&
deviceTwin.expected.metadata.timestamp > deviceTwin.actual.metadata.timestamp &&
deviceTwin.expected.value !== deviceTwin.actual.value)) {
callback(deviceTwin.expected.value);
}
}
这里逻辑还比较复杂,总体上就是deviceTwin这个对象中需要有expected,但没有actual,并且expected的更新时间晚于actual,并且expected和actual值不等的时候,调用dealDeltaMsg对实际设备上的寄存器进行更新。
deviceProfile更新
index.js会监听deviceProfile更新,当cloud通过configmap更新deviceProfile时,会重新创建一个mqtt的client,来监听twinGetResTopic的topic。
对topic的处理和之前写的twinGetResTopic处理逻辑基本一致,只是少了setActuals的步骤,而只调用setExpecteds。
从这里也可以看出,setActuals只是mapper第一次启动的时候进行一下初始化。
syncDeviceTwin定时任务
syncDeviceTwin这个定时任务每两秒执行一次
定时任务中,遍历本地缓存总的所有设备,通过modbus读取设备上最新的数据,然后更新本地的ActualVals缓存,
twinUpdateTopic
然后调用updateActual方法,通过$hw/events/device/123456789/twin/update topic推送device twin的更新
directGetTopic
接着调用UpdateDirectActuals方法,往$hw/devices/123456789/events/properties/get 的topic中推送最新的设备属性数据
两者区别
从上述分析可以看出,定时任务会通过mqtt推送两类数据,看一下他们的区别。
- directGetTopic只在每次定时任务结束的时候调用一次,而twinUpdateTopic是每个设备属性的值发生变化时,就调用一次。
- directGetTopic每次更新这个设备的所有属性,twinUpdateTopic每次只更新一个设备的一个属性
- directGetTopic有些额外的字段,比如route、content与header字段(具体用途见edge core的device twin模块分析)
Edge端
edged的eventbus会监控边缘MQTT的topic,根据topic名称和内容进行相应处理。
eventbus
首先确认一点,所有与MQTT broker的交互,都是通过eventbus来做的,eventbus从MQTT订阅消息以后,再通过beehive发送到其他模块,如devicetwin模块
// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, message MQTT.Message) {
klog.Infof("OnSubMessageReceived receive msg from topic: %s", message.Topic())
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for "SYS/dis/upload_records", no need to base64 topic
var target string
resource := base64.URLEncoding.EncodeToString([]byte(message.Topic()))
if strings.HasPrefix(message.Topic(), "$hw/events/device") || strings.HasPrefix(message.Topic(), "$hw/events/node") {
target = modules.TwinGroup
} else {
target = modules.HubGroup
if message.Topic() == "SYS/dis/upload_records" {
resource = "SYS/dis/upload_records"
}
}
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
msg := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
resource, "response").FillBody(string(message.Payload()))
klog.Info(fmt.Sprintf("received msg from mqttserver, deliver to %s with resource %s", target, resource))
beehiveContext.SendToGroup(target, *msg)
}
// BuildRouter sets route and resource operation in message
func (msg *Message) BuildRouter(source, group, res, opr string) *Message {
msg.SetRoute(source, group)
msg.SetResourceOperation(res, opr)
return msg
}
resource是通过base64编码的完整的topic的名字
target对于"
hw/events/node"这两个类型的topic,target是 twinGroup,否则是HubGroup
router key是
routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>根据BuildRouter的参数可以看出,这里的source是bus,group是user
devicetwin
参考官方文档 https://github.com/kubeedge/kubeedge/blob/master/docs/modules/edge/devicetwin.md

devicetwin从beehive中取到group为twin的消息,
然后调用classifyMsg来对消息进行分类。这里的分类方式,对devicetwin的处理流程有比较大的帮助,所以仔细看下
func classifyMsg(message *dttype.DTMessage) bool {
if EventActionMap == nil {
initEventActionMap()
}
var identity string
var action string
msgSource := message.Msg.GetSource()
if strings.Compare(msgSource, "bus") == 0 {
idLoc := 3
topic := message.Msg.GetResource()
topicByte, err := base64.URLEncoding.DecodeString(topic)
if err != nil {
return false
}
topic = string(topicByte)
klog.Infof("classify the msg with the topic %s", topic)
splitString := strings.Split(topic, "/")
if len(splitString) == 4 {
if strings.HasPrefix(topic, dtcommon.LifeCycleConnectETPrefix) {
action = dtcommon.LifeCycle
} else if strings.HasPrefix(topic, dtcommon.LifeCycleDisconnectETPrefix) {
action = dtcommon.LifeCycle
} else {
return false
}
} else {
identity = splitString[idLoc]
loc := strings.Index(topic, identity)
nextLoc := loc + len(identity)
prefix := topic[0:loc]
suffix := topic[nextLoc:]
klog.Infof("%s %s", prefix, suffix)
if v, exist := EventActionMap[prefix][suffix]; exist {
action = v
} else {
return false
}
}
message.Msg.Content = []byte((message.Msg.Content).(string))
message.Identity = identity
message.Action = action
klog.Infof("Classify the msg to action %s", action)
return true
} else if (strings.Compare(msgSource, "edgemgr") == 0) || (strings.Compare(msgSource, "devicecontroller") == 0) {
switch message.Msg.Content.(type) {
case []byte:
klog.Info("Message content type is []byte, no need to marshal again")
default:
content, err := json.Marshal(message.Msg.Content)
if err != nil {
return false
}
message.Msg.Content = content
}
if strings.Contains(message.Msg.Router.Resource, "membership/detail") {
message.Action = dtcommon.MemDetailResult
return true
} else if strings.Contains(message.Msg.Router.Resource, "membership") {
message.Action = dtcommon.MemUpdated
return true
} else if strings.Contains(message.Msg.Router.Resource, "twin/cloud_updated") {
message.Action = dtcommon.TwinCloudSync
resources := strings.Split(message.Msg.Router.Resource, "/")
message.Identity = resources[1]
return true
} else if strings.Contains(message.Msg.Router.Operation, "updated") {
resources := strings.Split(message.Msg.Router.Resource, "/")
if len(resources) == 2 && strings.Compare(resources[0], "device") == 0 {
message.Action = dtcommon.DeviceUpdated
message.Identity = resources[1]
}
return true
}
return false
} else if strings.Compare(msgSource, "edgehub") == 0 {
if strings.Compare(message.Msg.Router.Resource, "node/connection") == 0 {
message.Action = dtcommon.LifeCycle
return true
}
return false
}
return false
}
classifyMsg中首先判断的是msg的source,从代码中看,分别有bus、edgemgr、devicecontroller、edgehub四种。
从eventbus中过来的消息,就是bus;从edgehub过来的消息目前看就只有心跳;edgemgr的source从代码中没看到,目前应该是用不到;devicecontroller过来的消息应该是云端对device的操作;
这里在分析mapper与devicetwin的交互,所以先看bus的类型的消息。
对于"$hw/events/connected/%s"类型的消息,是心跳类型,返回false,也就是不需要处理
对于其他消息,也就是"$hw/events/device/123456789/twin/update"的消息,identity就是取的第四个字段,也就是device的ID
action则是根据前缀$hw/events/device/和后缀twin/update这两部分查表(EventActionMap)查出来的,
通过这两部分进行查表,最终进入到处理方法dealTwinUpdate方法中

遗留问题
twinDeltaTopic和twinGetResTopic区别
twinDeltaTopic和twinGetResTopic都可以更新设备上的值,两者的区别在哪里?
- BuildDeviceTwinResult
- BuildDeviceTwinDelta
directGetTopic没有接受方
directGetTopic从代码中看,是没有接收方到,也就是说这个消息不会被处理
mapper更新了所有属性
从modbus mapper中可以看到msg twin的一个结构,即一个key是Property名称的字典
let reply_msg = {
event_id: "",
timestamp: new Date().getTime()
};
let twin = {};
twin[property.name] = {
actual: {
value: String(value),
metadata: {
timestamp: new Date().getTime()
}
},
metadata: {
tyep: property.dataType
}
};
reply_msg.twin = twin;
所以,最终eventbus就是从modbus mapper中取出msg twin来进行后续处理。
但这里其实是有个问题的,modbus mapper的当前实现上,没有判断一个属性是否是twin属性,
直接将所有属性都做为twin属性给publish了,这个问题在后续应该会进行修复。