应用场景说明:
在经济高速发展的今天,现代人对自己的生活的要求越来越高,家电设备也迅猛增加,但是日常生活中,人们不擅长对于家电的管理,而造成了大量的不必要的能耗损失。在这样的一种情况下,如果有一套智能家居系统能够管理家庭电器的状态,我们也可以随时的控制家电,这样我们的生活效率将会有很大程度上的提高。
在这样的一个需求的大背景下,我们又对设备和设备之间,人和设备之间,进行了一番详细的分析。首先我们来看设备和设备之间的需求。
1.假设在N市的A设备的状态发生改变,我们需要远在M市的B设备的状态也发生改变,这种需求我们称为不同网段的设备间的联动需求。
2.假设在N市的A设备的状态发生改变,我们需要当前统一网段下的B设备也发生改变,这种需求我们称为同一网段的设备间的联动需求。
3.假设主人远行,而忘记了自己家有没有锁门,有没有关灯,有没有不该开启的电器设备关闭,这是我们需要得知家中设备运行的状态,并调控到最佳状态。我们称之为人机远程调控需求。
4.假设在家庭与广域网断网的情况下,我们还可以得知和控制家庭的设备,而不是失去对于家庭设备间的控制。我们称之为远程失联需求。
基于国内市场的大需求和我们自行分析的小需求下,我们设计了一套满足于以上四点控制刚性需求的智能家居控制系统。
系统结构说明
我们现在有了上面的需求分析,这时我们就可以对系统进行选型和架构了,我们对于设备间的联动需求进行分析后选择了一种物联网广泛使用的推送消息的协议机制(mqtt),然后对它进行二次开发和封装。下面我们就来看看系统的设计结构:
结构:
1.节点事件上报(publish nodeid event),这个场景用于当人在现场对设备的状态进行了改变,这时,该设备应该向主服务器进行通报,事件的发生,以及当前的状态,还有为了实现设备的热插拔,当设备连上这套系统后,它便会广播上线通知,当设备异常断开系统后,会发送遗言离线通知,方便我们对节点事件异常进行及时的处理。
2.节点属性上报(publish nodeid property),当人为的改变了设备后,主服务器和在线的控制端将会受到该设备节点的属性上报通知,这个行为主要是及时的获取设备点的状态信息。当设备刚上线是也会进行属性播报,以便控制端热加载设备。
3.节点方法被调用(subscribe nodeid call | publish 0 ack),当N市的A设备状态发生改变,M市的B设备也要发生状态的改变,就会直接让A去控制B设备,这时我们成A为控制器,B为执行器。那么A就会调用控制远程设备命令,B就会收到call命令之后执行命令并返回一个ack以确认信息的无误性。
4.系统广播事件(subscribe 0 system),当所有的设备同时接受统一命令的调控时,我们为了提高信息处理的效率使用系统广播事件来统一调度。
在这五个控制总命令下,我们还将设计针对每种设备的控制子命令格式。从而达到既从属分布式控制有归属于集中式控制系统。
智能家居互联的通讯协议:
1.角色定义:
节点,设备,控制器,服务器
2.主题结构:
yqmiot/<accountid>/<receiver>/<sender>/<command>
3.消息结构:
{
receiver: , # 接受者nodeid
sender: , # 发送者nodeid
name: , # 主命令(名字有带商定)
action: , # 子命令(可为null)
callseq: , # 调用序号(多次调用时确定回包对应的请求) (非call和ack命令可以为null)
params: , # 命令参数
# seq: , # 包序号(用户筛选重复数据包) 暂未使用
}
备注:receiver,sender,name 未来这三者在发送数据包中可能被省略,因主题中已经存在。
属性上报(property)
-command: "property"
-params: 设备属性 ({"name": "hello", "status": "正忙呢", "yqmiot.property.nodeid": 27888})
事件上报(event)
-command: "event"
-action: 事件名 ("yqmiot.event.online", "yqmiot.event.offline")
-params: 事件参数
方法调用(call)
-command: "call"
-action: 方法名 ("yqmiot.method.ping", "yqmiot.method.test")
-callseq: 调用序号(每次调用都必须唯一)
-params: 方法参数
调用响应(ack)
-command: "ack"
-action: call包中的action
-callseq: call包中的seq
-params: 回应参数
其他(暂未使用)
服务器 nodeid: 0
全频道广播 nodeid: 0xffffffff
全服广播 accountid: 0, nodeid: 0
我们把通信协议搭建好了之后,就来开始构建整个系统,接下来就是要使用编程语言进行编程实现。从协议开始,一步一步构建 整套系统的通讯层和应用层,以及控制端。
系统的构建:
1.设备控制端的构建:
我们是基于可以运行嵌入式linux系统的设备,对节点进行控制。由于linux系统的便利性。我们使用了python这种脚本对设备客户端进行了编程处理,接下来我们一步步的看,被控器的客户端构建。
1.1.引入依赖包和常用参数.
# -*- encoding: utf-8 -*-
importlogging
importtime
importsys
importgetopt
importjson
frompaho.mqtt.clientimportClientasMqtt
VERSION ="1.0.1"
"""
每个设备都拥有三类特性:属性,事件,方法。
属性表示设备的当前状态,比如:电力状态,照明开关等。每当属性发生改变就会立即上报。
事件表示设备当前发生了什么,按下按钮,电力不足警告等。
方法则是设备对外提供的操作接口,通过它可以对设备进行控制。比如:重启,打开照明,关机等。
"""
YQMIOT_OK =0
YQMIOT_TIMEOUT =1
YQMIOT_BROADCAST_RECEIVER =0# 广播接受者id
# 系统命令
YQMIOT_COMMAND_PROPERTY ="property"# 属性上报
YQMIOT_COMMAND_EVENT ="event"# 事件上报
YQMIOT_COMMAND_CALL ="call"# 方法调用
YQMIOT_COMMAND_ACK ="ack"# 方法响应
# 系统事件
YQMIOT_EVENT_ONLINE ="yqmiot.event.online"# 上线通知
YQMIOT_EVENT_OFFLINE ="yqmiot.event.offline"# 下线通知
YQMIOT_EVENT_TEST ="yqmiot.event.test"# 按下测试按钮
# 系统属性
YQMIOT_PROPERTY_NODEID ="yqmiot.property.nodeid"# 节点id号
YQMIOT_PROPERTY_ACCOUNTID ="yqmiot.property.accountid"# 节点所在账号id(频道id)频道隔离
YQMIOT_PROPERTY_MODEL ="yqmiot.property.model"# 设备所属类型
YQMIOT_PROPERTY_VERSION ="yqmiot.property.version"# 设备所属固件版本号
# 系统方法
YQMIOT_METHOD_PING ="yqmiot.method.ping"# ping连通测试
YQMIOT_METHOD_TEST ="yqmiot.method.test"# 方法调用测试
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
root = logging.getLogger()
root.setLevel(logging.NOTSET)
1.2.mqtt通讯层的基本封装
classMqttClient(object):
"""Mqtt通讯封装"""
def__init__(self,address):
ifnotisinstance(address,tuple) orlen(address) !=2:
raiseValueError("Invalid address.")
defon_connect(client,userdata,flags,rc):
self.handleConnected()
defon_message(client,userdata,msg):
self.handleMessage(msg.topic, msg.payload)
self.client = Mqtt()
self.address = address
self.client.on_connect = on_connect
self.client.on_message = on_message
defhandleConnected(self):
pass
defhandleMessage(self,topic,payload):
pass
defpublish(self,topic,payload=None,qos=0,retain=False):
self.client.publish(topic, payload, qos, retain)
defsubscribe(self,topic,qos=0):
self.client.subscribe(topic, qos)
defstart(self):
self.client.connect_async(self.address[0],self.address[1])
self.client.loop_start()
defstop(self):
self.client.loop_stop()
defusername_pw_set(self,username,password=None):
self.client.username_pw_set(username, password)
defwill_set(self,topic,payload=None,qos=0,retain=False):
self.client.will_set(topic, payload, qos, retain)
1.3.家居互联通讯层封装
classYqmiotBase(MqttClient):
"""月球猫互联通讯基类"""
def__init__(self,address,accountid,nodeid,authkey=None,username=None,password=None):
"""username和password是mqtt账号密码。"""
super(YqmiotBase,self).__init__(address)
self.username = username
self.password = password
self.accountid = accountid
self.nodeid = nodeid
self.authkey = authkey#TODO
self.callMethodInfo = {}#
self.callMethodTimeout =10*1000# 方法调用超时时间TODO处理多线程问题。调用超时
self.callseq =0
ifself.accountid <=0orself.nodeid <=0:
raiseValueError("Invalid accountid or nodeid.")
defhandleConnected(self):
super(YqmiotBase,self).handleConnected()
# 侦听发送给自己的消息
topic ="yqmiot/{self.accountid}/{self.nodeid}/#".format(self=self)
self.subscribe(topic)
defhandleMessage(self,topic,payload):
super(YqmiotBase,self).handleMessage(topic, payload)
try:
prefix, account, receiver, sender, command = topic.split("/")
account =int(account)
receiver =int(receiver)
sender =int(sender)
except:
logging.error("Invalid topic. {}".format(topic))
return
# if prefix != "yqmiot" \
# or account != self.accountid \
# or receiver != self.nodeid: #TODO处理广播
# logging.error("It's not my topic. {}".format(topic))
# return
try:
payload = json.loads(payload)
except:
logging.error("Invalid payload. {}".format(payload))
return
cmd = Command(
name= command,
action= payload.get("action"),
receiver= receiver,
sender= sender,
callseq= payload.get("callseq"),
params= payload.get("params"))
try:
self.handleCommand(cmd)
except:
logging.error("Error processing command. {}".format(topic))
return
defsendCommand(self,cmd):
ifcmd:
try:
accountid =self.accountid
receiver = cmd.receiverifcmd.receiver !=NoneelseYQMIOT_BROADCAST_RECEIVER# 默认接受者是服务器
sender =self.nodeid
name = cmd.name
action = cmd.action
callseq = cmd.callseq
params = cmd.paramsifcmd.params !=Noneelse{}
topic ="yqmiot/{}/{}/{}/{}".format(accountid, receiver, sender, name)
payload = {"action": cmd.action,"callseq": callseq,"params": params}
self.publish(topic, json.dumps(payload))
exceptException, e:
logging.error("Error sending command."+str(e))
else:
logging.error("Invalid cmd.")
defhandleCommand(self,cmd):
ifcmd.name == YQMIOT_COMMAND_CALL:
self.handleCommandCall(cmd)
elifcmd.name == YQMIOT_COMMAND_ACK:
callseq = cmd.callseq
ifcallseq inself.callMethodInfo:
info =self.callMethodInfo.pop(callseq)
cmd.action = info["action"]
cmd.time = millis() - info["time"]
self.handleCommandAck(cmd)
else:
logging.error("Drop unknown command.")
else:
logging.error("Command not supported.")
defhandleCommandCall(self,cmd):
ifcmd.action == YQMIOT_METHOD_PING:
self.handleCommandCallPing(cmd)
else:
logging.warn("Could not find method.")
defhandleCommandAck(self,cmd):
ifcmd.action == YQMIOT_METHOD_PING:
self.handleCommandCallPingAck(cmd)
defcallMethod(self,receiver,action,params=None):
ifreceiver and receiver != YQMIOT_BROADCAST_RECEIVER and action:
try:
self.callseq +=1
cmd = Command(
name= YQMIOT_COMMAND_CALL,
action= action,
receiver= receiver,
callseq=self.callseq,
params= params)
self.callMethodInfo[cmd.callseq] = {"action": action,"callseq": cmd.callseq,"time": millis()}
self.sendCommand(cmd)
except:
logging.error("Error calling remote action.")
else:
logging.error("Remote action parameter is incorrect.")
defcallMethodPing(self,receiver):
self.callMethod(receiver, YQMIOT_METHOD_PING)
defhandleCommandCallPing(self,cmd):
self.sendCommand(cmd.reply())
defhandleCommandCallPingAck(self,cmd):
pass
1.4.互联客户端封装
classYqmiotClient(YqmiotBase):
"""月球猫互联客户端
属性定时上报
属性变更上报
事件上报
处理方法调用,并回包"""
defstart(self):
# 离线通知
topic ="yqmiot/{}/{}/{}/{}".format(self.accountid, YQMIOT_BROADCAST_RECEIVER,self.nodeid, YQMIOT_COMMAND_EVENT)
payload = {"action": YQMIOT_EVENT_OFFLINE}
self.will_set(topic, json.dumps(payload))
super(YqmiotClient,self).start()
defhandleConnected(self):
super(YqmiotClient,self).handleConnected()
logging.info("Connect server successfully.")
# 上线通知
self.reportEvent(YQMIOT_EVENT_ONLINE)
#TODO推送下线遗言
defreportProperty(self,params):
"""属性上报
params(dict) 设备属性集"""
ifisinstance(params,dict):
try:
cmd = Command(
name= YQMIOT_COMMAND_PROPERTY,
receiver= YQMIOT_BROADCAST_RECEIVER,
params= params)
self.sendCommand(cmd)
except:
logging.error("An error occurred while reporting the property.")
else:
raiseTypeError("Incorrect params type.")
defreportEvent(self,action,params=None):
"""事件上报
action 事件名
params 参数"""
ifaction:
try:
cmd = Command(
name= YQMIOT_COMMAND_EVENT,
action= action,
receiver= YQMIOT_BROADCAST_RECEIVER,
params= params)
self.sendCommand(cmd)
except:
logging.error("An error occurred while reporting the event.")
else:
raiseTypeError("Incorrect action type.")
1.5.家居系统互联控制器封装
classYqmiotController(YqmiotBase):
"""
月球猫互联控制器
"""
# 订阅广播消息
defhandleConnected(self):
super(YqmiotController,self).handleConnected()
logging.info("Connect server successfully.")
# 侦听设备上报
topic ="yqmiot/{self.accountid}/0/#".format(self=self)
self.subscribe(topic)
defhandleCommand(self,cmd):
ifcmd.name == YQMIOT_COMMAND_PROPERTY:
self.handleCommandProperty(cmd)
elifcmd.name == YQMIOT_COMMAND_EVENT:
self.handleCommandEvent(cmd)
else:
super(YqmiotController,self).handleCommand(cmd)
defhandleCommandProperty(self,cmd):
print"设备 {} 上报属性:{}".format(cmd.sender, cmd.params)
defhandleCommandEvent(self,cmd):
print"设备 {} 上报事件:{} 参数:{}".format(cmd.sender, cmd.action, cmd.params)
到这里为止,我们的控制系统的客户端已经封装完毕,但是这才刚刚起步,我们有了客户端,那我们还需要远程控制器,我们为了简便起见使用了web终端的方案。来进行对设备客户端的控制,由于代码量很大我这里就简要的介绍一下。
在控制端中主要使用的mqtt推送协议,然后转换成socket以便实时控制。因为我们的技术栈使用的是vuejs,大家如果不了解可以先去了解了解,这是一种以数据为驱动的web解决方案,告别了传统的dom节点控制。使得运行速度和性能得到了很大的提升。我们在控制得到socket数据后,然后进行分发进入各种控制器,分别管理不同数据和业务逻辑的实现以及数据的调配。
实践效果:
好下面我们就来看看最后达到的控制效果吧!