4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
[TOC]
引言
开源项目 Sparrow
的基础框架搭建已接近完成,中间件的基础功能大多已经具备。为了验证该框架的实用性,在工程中引入了业务模块 OneNetMqtt
。从模块命名可以推断其主要功能是通过 MQTT
协议连接 OneNet
平台。
最初接触 OneNet
还是在大学期间,当时的毕业设计基于 OneNet
实现了环境数据采集系统。由于当时的个人水平限制,并未采用 MQTT
协议实现,功能上体现的效果也不尽预期。现在重新构建此功能,弥补了旧时自身能力的不足,新的实现过程更为高效,连接和数据传输都相当稳定。本篇大致介绍一下功能和主要模块,后续根据需要补充。
<span style="font-size: 12px;">
<span style="color: blue;"> 注:文末提供本文源码获取方式。文章不定时更新,喜欢本公众号系列文章,可以星标公众号,避免遗漏干货文章。源码开源,如果对您有帮助,帮忙分享、点赞加收藏喔!</span>
</span>
概述
当前模块的实现引入了事件驱动模型的设计思想,由具体的事件触发,并结合当前模块对应的状态来决定执行的操作和响应的行为,确保能够根据实时情况动态调整,维持准确的运行状态。
从业务架构的角度,OneNetMqtt 模块被划分为三层,每层专注于特定的功能:
- OneNetManager: 作为业务管理模块,负责处理核心业务逻辑。它持有多个 OneNet 设备对象,管理设备的激活与注销,启动心跳定时器,并维护设备的状态转换等任务,以响应外部事件。
- OneNetDevice: 设备模块。主要用于存储和管理每个设备的相关信息,确保设备数据的完整性和准确性。
- OneNetDriver: 作为设备驱动模块。主要负责 Socket 状态的管理以及 MQTT 数据的编解码工作。这保证了与 OneNet 平台之间通信的稳定性和效率。
如下是代码结构:
Sparrow/Components/Business$ tree -L 2
.
├── CMakeLists.txt
└── OneNetMqtt
├── CMakeLists.txt
├── OneNetCommon.h
├── OneNetDevice.cpp
├── OneNetDevice.h
├── OneNetDriver.cpp
├── OneNetDriver.h
├── OneNetHub.cpp
├── OneNetHub.h
├── OneNetManager.cpp
├── OneNetManager.h
└── main_onenet.cpp
需求分析
通过分析物联网设备通信的特点,可以将功能需求概括为:
- 设备状态管理
- 准确追踪设备的连接状态
- Socket连接状态实时监控
- MQTT协议状态准确跟踪
- 支持状态查询和统计
- 支持优雅的状态切换
- 状态转换过程可控
- 异常状态自动恢复
- 状态切换日志记录
- 异常情况自动恢复
- 网络断开自动重连
- 协议异常自动处理
- 资源自动释放
- 消息处理机制
- 支持异步消息处理
- 消息队列管理
- 优先级处理
- 超时处理
- 消息分发准确可靠
- 基于状态的消息路由
- 消息过滤机制
- 错误消息处理
- 便于扩展新消息类型
- 消息类型注册机制
- 处理函数动态绑定
- 向后兼容支持
- 定时任务处理
- 心跳保活机制
- 可配置的心跳间隔
- 心跳超时检测
- 断线重连策略
- 数据定时上报
- 支持多种上报周期
- 数据缓存机制
- 失败重试机制
设计方案
基于上述分析,以下是设计方案的大致流程:
- 状态机设计
- ① 定义双层状态结构
- 第一层负责Socket连接状态管理
- 第二层负责MQTT协议状态管理
- 定义状态间的转换关系
- ② 实现状态转换表
- 使用表驱动方式管理状态转换
- 支持状态通配符
- 实现状态转换回调
- ③ 设计消息分发机制
- 基于观察者模式的消息通知
- 支持多接收者订阅
- 实现消息过滤功能
- 设备管理实现
- ① 设备生命周期管理
- 创建和销毁设备实例
- 管理设备连接状态
- 处理设备配置信息
- ② 资源自动回收
- 使用智能指针管理内存
- 定时清理无效连接
- 释放系统资源
- ③ 异常状态处理
- 检测异常状态
- 实现恢复策略
- 记录错误信息
详细设计
主要通过状态机和观察者模式实现设备管理,详细实现如下:
-
双层状态定义
首先定义清晰的状态枚举,包括Socket
层和MQTT
层两个维度:
// 一级状态(Socket层)
enum EOneNetMgrLev1State {
LEV1_ONENET_MGR_ANY, // 任意状态
LEV1_ONENET_MGR_IDLE, // 初始状态
LEV1_ONENET_MGR_CONNECTING, // 连接中
LEV1_ONENET_MGR_CONNECTED, // 已连接
LEV1_ONENET_MGR_DISCONNECTED, // 已断开
LEV1_ONENET_MGR_BUTT
};
// 二级状态(MQTT层)
enum EOneNetMgrLev2State {
LEV2_ONENET_MGR_ANY, // 任意状态
LEV2_ONENET_MGR_BUTT
};
这种双层状态设计的优势在于:
- 清晰分离网络层和协议层状态
- 支持ANY状态用于通配处理
- 便于扩展新的状态定义
-
状态转换表实现
使用状态转换表来管理所有可能的状态转换,每个表项定义了:源状态、目标状态、触发消息和处理函数:
vector<StateTransition<EOneNetMgrLev1State, EOneNetMgrLev2State, ESprSigId, OneNetManager, SprMsg>>
OneNetManager::mStateTable = {
// 空闲状态处理连接请求
{ LEV1_ONENET_MGR_IDLE, LEV2_ONENET_MGR_ANY,
SIG_ID_ONENET_MGR_ACTIVE_DEVICE_CONNECT,
&OneNetManager::MsgRespondActiveDeviceConnect },
// 已连接状态处理心跳
{ LEV1_ONENET_MGR_CONNECTED, LEV2_ONENET_MGR_ANY,
SIG_ID_ONENET_MGR_PING_TIMER_EVENT,
&OneNetManager::MsgRespondMqttPingTimerEvent },
// ... 其他状态转换定义
};
状态转换表的设计考虑了以下几点:
- 使用模板实现通用状态转换结构
- 支持状态和消息的通配匹配
- 每个状态转换绑定具体的处理函数
-
智能消息分发机制
实现了一个高效的消息分发器,能根据当前状态和消息类型自动匹配到对应的处理函数:
int32_t OneNetManager::ProcessMsg(const SprMsg& msg)
{
SPR_LOGD("Recv msg: %s on <%s : %s>\n", GetSigName(msg.GetMsgId()),
GetLev1StateString(mCurLev1State), GetLev2StateString(mCurLev2State));
// 查找匹配的状态处理函数
auto stateEntry = std::find_if(mStateTable.begin(), mStateTable.end(),
[this, &msg](const StateTransitionType& entry) {
return ((entry.lev1State == mCurLev1State || entry.lev1State == LEV1_ONENET_MGR_ANY) &&
(entry.lev2State == mCurLev2State || entry.lev2State == LEV2_ONENET_MGR_ANY) &&
(entry.sigId == msg.GetMsgId() || entry.sigId == SIG_ID_ANY));
});
if (stateEntry != mStateTable.end()) {
(this->*(stateEntry->callback))(msg);
}
return 0;
}
-
定时器管理
实现定时器管理机制,用于处理心跳保活和数据上报等周期性任务:
void OneNetManager::MsgRespondMqttConnAck(const SprMsg& msg)
{
// 获取设备的心跳间隔
int32_t keepAliveInSec = mOneDeviceMap[mCurActiveDevice]->GetKeepAliveIntervalInSec();
if (keepAliveInSec <= 0) {
SPR_LOGW("Invalid keep alive interval: %d, set default: %d",
keepAliveInSec, DEFAULT_PING_TIMER_INTERVAL);
keepAliveInSec = DEFAULT_PING_TIMER_INTERVAL;
}
// 启动定时器
StartTimerToPingOneNet(keepAliveInSec * 1000);
StartTimerToReportData(DEFAULT_DATA_REPORT_INTERVAL * 1000);
}
void OneNetManager::MsgRespondMqttPingTimerEvent(const SprMsg& msg)
{
// 检查设备连接状态
if (mCurLev1State != LEV1_ONENET_MGR_CONNECTED) {
SPR_LOGD("Device not connect, stop ping timer\n");
mEnablePingTimer = false;
UnregisterTimer(SIG_ID_ONENET_MGR_PING_TIMER_EVENT);
return;
}
// 发送心跳消息
NotifyMsgToOneNetDevice(mCurActiveDevice, msg);
}
-
错误处理机制
设计了统一的错误处理宏,简化错误处理代码:
#define CHECK_ONENET_RET_VALIDITY(__expr) do { \
int32_t __ret = (__expr); \
if (__ret == -1) { \
return __ret; \
} \
} while(0)
#define CHECK_ONENET_POINTER(__p, __err) do { \
if ((__p) == nullptr) { \
SPR_LOGE("INVALID POINTER: %s is nullptr!\n", (#__p)); \
return __err; \
} \
} while(0)
-
模块插件化编程机制
首先OneNetMqtt
为业务模块而非核心模块,在设计时希望将其做成可动态加载卸载的模块。即根据项目需要动态配置是否支持此业务功能。插件化编程机制在之前的文章4步实现C/C++插件化编程也有体现,这里只列举模块入口。
// The entry of OneNet business plugin
extern "C" void PluginEntry(std::map<int, SprObserver*>& observers, SprContext& ctx)
{
if (observers.find(MODULE_ONENET_DRIVER) != observers.end() && observers[MODULE_ONENET_DRIVER]) {
SPR_LOGD("OneNet driver module has been loaded!\n");
return;
}
if (observers.find(MODULE_ONENET_MANAGER) != observers.end() && observers[MODULE_ONENET_MANAGER]) {
SPR_LOGD("OneNet manager module has been loaded!\n");
return;
}
auto pOneDrv = new (std::nothrow) OneNetDriver(MODULE_ONENET_DRIVER, "OneDrv");
auto pOneMgr = new (std::nothrow) OneNetManager(MODULE_ONENET_MANAGER, "OneMgr");
gpOneNetHub = new (std::nothrow) OneNetHub("OneNetMqtt", pOneMgr);
pOneDrv->Initialize();
pOneMgr->Initialize();
gpOneNetHub->InitializeHub();
observers[MODULE_ONENET_DRIVER] = pOneDrv;
observers[MODULE_ONENET_MANAGER] = pOneMgr;
SPR_LOGD("Load plug-in OneNet modules\n");
}
// The exit of OneNet business plugin
extern "C" void PluginExit(std::map<int, SprObserver*>& observers, SprContext& ctx)
{
if (gpOneNetHub) {
delete gpOneNetHub;
gpOneNetHub = nullptr;
}
auto it = observers.find(MODULE_ONENET_DRIVER);
if (it != observers.end() && it->second) {
delete it->second;
it->second = nullptr;
observers.erase(it);
}
it = observers.find(MODULE_ONENET_MANAGER);
if (it != observers.end() && it->second) {
delete it->second;
it->second = nullptr;
observers.erase(it);
}
SPR_LOGD("Unload plug-in OneNet modules\n");
}
验证
1.验证设备成功连接
- 激活指令:
$ echo Active PC_TEST_02 > /tmp/sparrowsrv
- 终端日志打印:
12-12 22:04:37.614 78478 OneNetMgr D: 235 OneNetManager Init
...
12-12 22:04:42.179 78478 OneNetMgr D: 464 Debug Active Device [PC_TEST_02]
12-12 22:04:42.260 78478 OneNetMgr D: 393 Enable report timer, interval: 10000ms
12-12 22:05:02.260 78478 OneNetMgr D: 408 Notify module device: PC_TEST_02, msg: SIG_ID_ONENET_MGR_DATA_REPORT_TIMER_EVENT
- Onenet平台:
从日志上看PC_TEST_02
设备已经被正常激活,且数据正常上报。OneNet
平台看数据上报周期正常(10s)上报一次。
2.验证设备主动下线
- 下线指令:
$ echo Deactive > /tmp/sparrowsrv
- 终端日志打印:
12-12 22:10:15.630 78478 OneNetMgr D: 472 Debug Deactive Device
12-12 22:10:15.630 78478 OneNetMgr D: 634 Recv msg: SIG_ID_ONENET_MGR_DEACTIVE_DEVICE_DISCONNECT on <LEV1_ONENET_MGR_CONNECTED : LEV2_ONENET_MGR_ANY>
12-12 22:10:15.630 78478 OneNetMgr D: 322 Lev1 state changed: LEV1_ONENET_MGR_CONNECTED -> LEV1_ONENET_MGR_DISCONNECTED
12-12 22:10:15.630 78478 OneNetMgr D: 408 Notify module device: PC_TEST_02, msg: SIG_ID_ONENET_MGR_DEACTIVE_DEVICE_DISCONNECT
-
Onenet平台:
总结
-
事件驱动模型
设计思想在嵌入式大型项目中应用广泛,它能够有效减少复杂的if-else
条件判断语句,从而简化代码结构。此外,这种模型有助于清晰地表达业务逻辑,使系统行为与实际业务需求更加紧密对齐。 - 当然在编码实现时,也不应机械套用设计思想,而应在深刻理解业务需求的基础上,灵活运用事件驱动模型的理念或模式。确保代码不仅结构清晰、逻辑严谨,还能准确表达业务流程和状态转换。
- 在实现该业务模块时,鉴于其定位为业务模块而非核心组件,并为了探索插件化编程的优势,最终决定采用插件形式进行开发。这种方式不仅保持了系统的灵活性和可扩展性,还便于未来对该模块的独立更新和维护。
- 通过
OneNetMqtt
模块的实现,验证了Sparrow
中消息分发、RPC、定时器、配置以及插件化框架功能,目前看稳定性还可以,可供实际项目应用参考。