本文以 luatos-docs-code 为辅助工具,搭建 LuatOS 温湿度 MQTT 上报工程。硬件载体为 Air780EPM 通信模组,网络层配置固定 MQTT 服务地址与端口,内置连接异常重连机制;应用层定时轮询 AirSHT30_1000 传感器,以一分钟为周期上传采集数据。
代码层面实行分层解耦,由 main.lua 作为入口文件,搭配多个独立功能 Lua 模块;入口文件需完善项目名、版本号注释,增补 FOTA 升级、errDump 调试说明,通过 require 加载业务模块并调用 sys.run () 启动工程。新建 mqtt_temprature_humdity 目录统一收纳全部工程脚本与资源。
一、概述
在本文中,我们来演示如何使用 luatos-docs-code 协助开发 MQTT上传温湿度数据 项目脚本代码;
二、项目功能需求
生成一个 LuatOS 项目代码,功能需求如下:
硬件模组:Air780EPM
软件功能需求:
支持 MQTT 长连接,MQTT 服务器
MQTT 连接出现异常后,支持重连
每隔一分钟读取 AirSHT30_1000 的温湿度数据,将读取到的温湿度数据上传到 MQTT 服务器
软件代码设计要求:
项目脚本要包含 main.lua 和一个或者多个其他具体功能应用的 lua 文件,main.lua 文件、其他 lua 文件功能都要模块化解耦设计
main.lua 中,要有最基本的项目名,版本号,fota 升级使用说明,errDump 使用说明,require 其他功能应用的 lua 文件,sys.run()代码
代码存储路径:新建一个 mqtt_temprature_humdity 文件夹,将项目的脚本文件和资源文件都保存到这个文件夹
三、使用 luatos-docs-code 开发以及调试过程
3.1 输入项目需求,luatos-docs-code 第一次生成代码
[图片上传失败...(image-ddbb1e-1778574423203)]
3.2 生成的代码存在的问题
刚才生成的代码存在以下问题:
3.2.1 main.lua 编程规范需要改进
要遵循 Air780EPM demo 代码的以下编程规范
- main.lua 中,要有最基本的项目名,版本号,fota 升级使用说明,errDump 使用说明,require 其他功能应用的 lua 文件,sys.run()代码,不需要 require "sys",请你找一个 demo 目录下的 main.lua 文件学习一下,然后完全遵守 demo 中 main.lua 的格式和规范,注释也要详细
3.2.2 获取 4G 模组信号强度的函数好像不对
请你再修复一下
3.3 告知 luatos-docs-code 存在的问题,让它重新生成代码
3.3.1 修复 main.lua
将第一个问题告知 luatos_docs_code 后,让它重新生成,交互过程如下

这次生成的 main.lua 代码质量就很高了,给他提的这个问题已经解决!
3.3.2 修复获取 4G 模组信号强度的函数使用

3.4 最终生成的项目脚本代码
3.4.1 项目代码目录结构
mqtt_temprature_humdity/
├── main.lua # 项目主入口文件
├── config.lua # 配置文件
├── app_mqtt_main.lua # MQTT 客户端主应用模块
├── app_mqtt_sender.lua # MQTT 数据发送模块
├── app_mqtt_receiver.lua # MQTT 数据接收模块
├── app_sht30.lua # 温湿度传感器应用模块
└── AirSHT30_1000.lua # 温湿度传感器驱动文件
3.4.2 main.lua
1
--[[
2
@module main
3
@summary LuatOS用户应用脚本文件入口,总体调度应用逻辑
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本demo演示的核心功能为:
9
1. 支持MQTT长连接,MQTT服务器地址,端口为1234
10
2. MQTT连接出现异常后,支持自动重连
11
3. 每隔一分钟读取AirSHT30_1000的温湿度数据
12
4. 将读取到的温湿度数据以及4G模组的信号强度上传到MQTT服务器
13
更多说明参考本目录下的readme.md文件
14
]]
15
--[[
16
必须定义PROJECT和VERSION变量,Luatools工具会用到这两个变量,远程升级功能也会用到这两个变量
17
PROJECT:项目名,ascii string类型
18
可以随便定义,只要不使用,就行
19
VERSION:项目版本号,ascii string类型
20
如果使用iot平台进行远程升级,必须按照"XXX.YYY.ZZZ"三段格式定义: X、Y、Z各表示1位数字,三个X表示的数字可以相同,也可以不同,同理三个Y和三个Z表示的数字也是可以相同,可以不同 因为历史原因,YYY这三位数字必须存在,但是没有任何用处,可以一直写为999
21
如果不使用iot平台进行远程升级,根据自己项目的需求,自定义格式即可
22
]]
23
PROJECT = "MQTT_TEMP_HUMIDITY"
24
VERSION = "001.999.000"
25
log.info("main", "project name is ", PROJECT, "version is ", VERSION)
26
-- 如果内核固件支持errDump功能,此处进行配置,【强烈建议打开此处的注释】
27
-- 因为此功能模块可以记录并且上传脚本在运行过程中出现的语法错误或者其他自定义的错误信息,可以初步分析一些设备运行异常的问题
28
-- 以下代码是最基本的用法,更复杂的用法可以详细阅读API说明文档
29
-- 启动errDump日志存储并且上传功能,600秒上传一次
30
-- if errDump then
31
-- errDump.config(true, 600)
32
-- end
33
-- 使用LuatOS开发的任何一个项目,都强烈建议使用远程升级FOTA功能
34
-- 可以使用的iot平台进行远程升级
35
-- 也可以使用客户自己搭建的平台进行远程升级
36
-- 远程升级的详细用法,可以参考fota的demo进行使用
37
-- 启动一个循环定时器
38
-- 每隔3秒钟打印一次总内存,实时的已使用内存,历史最高的已使用内存情况
39
-- 方便分析内存使用是否有异常
40
-- sys.timerLoopStart(function()
41
-- log.info("mem.lua", rtos.meminfo())
42
-- log.info("mem.sys", rtos.meminfo("sys"))
43
-- end, 3000)
44
-- 加载配置文件
45
require "config"
46
-- 加载MQTT客户端主应用模块
47
require "app_mqtt_main"
48
-- 加载MQTT客户端数据发送模块
49
require "app_mqtt_sender"
50
-- 加载MQTT客户端数据接收模块
51
require "app_mqtt_receiver"
52
-- 加载温湿度传感器应用模块
53
require "app_sht30"
54
-- 用户代码已结束---------------------------------------------
55
-- 结尾总是这一句
56
sys.run()
57
-- sys.run()之后后面不要加任何语句!!!!!
3.4.3 config.lua
1
--[[
2
@module config
3
@summary 配置文件
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本文件包含项目的配置信息
9
]]
10
local config = {}
11
-- MQTT服务器配置
12
config.mqtt = {
13
server_addr = "MQTT服务器", -- MQTT服务器地址
14
server_port = 1234, -- MQTT服务器端口
15
client_id = "", -- 客户端ID,将在运行时设置为IMEI
16
username = "", -- MQTT用户名
17
password = "", -- MQTT密码
18
keepalive = 120, -- 心跳间隔(秒)
19
qos = 0 -- QoS等级
20
}
21
-- 温湿度传感器配置
22
config.sht30 = {
23
i2c_id = 0, -- I2C总线ID
24
slave_addr = 0x44 -- 传感器从机地址
25
}
26
-- 数据上传配置
27
config.data = {
28
upload_interval = 60 * 1000, -- 上传间隔(毫秒)
29
topic = "/up" -- MQTT上传主题后缀
30
}
31
return config
3.4.4 AirSHT30_1000.lua
1
--[[
2
@module AirSHT30_1000
3
@summary AirSHT30_1000应用功能模块
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本文件为AirSHT30_1000驱动配置文件,核心业务逻辑为:
9
1、打开AirSHT30_1000;
10
2、读取温湿度数据;
11
本文件没有对外接口,直接require "AirSHT30_1000"就可以加载运行;
12
]]
13
-- 本文件中的主机是指I2C主机,具体指Air780EXX系列每个模组
14
-- 本文件中的从机是指I2C从机,具体指AirSHT30_1000配件板上的sht30温湿度传感器芯片
15
local AirSHT30_1000 ={
16
-- i2c_id:主机的i2c id;
17
}
18
-- 从机地址为0x44
19
local slave_addr = 0x44
20
-- 计算数据表data中所有数据元素的crc8校验值
21
local function crc8(data)
22
local crc = 0xFF
23
for i = 1, #data do
24
crc = bit.bxor(crc, data[i])
25
for j = 1, 8 do crc = crc * 2 if crc >= 0x100 then crc = bit.band(bit.bxor(crc, 0x31), 0xff) end
26
end
27
end
28
return crc
29
end
30
-- 打开AirSHT30_1000;
31
--i2c_id:number类型;
32
-- 主机使用的I2C ID,用来控制AirSHT30_1000;
33
-- 取值范围:仅支持0和1;
34
-- 如果没有传入此参数,则默认为0;
35
--返回值:成功返回true,失败返回false
36
function AirSHT30_1000.open(i2c_id)
37
--如果i2c_id为nil,则赋值为默认值0
38
if i2c_id==nil then i2c_id=0 end
39
--检查参数的合法性
40
if not (i2c_id == 0 or i2c_id == 1) then
41
log.error("AirSHT30_1000.open", "invalid i2c_id", i2c_id)
42
return false
43
end
44
AirSHT30_1000.i2c_id = i2c_id
45
--初始化I2C
46
if i2c.setup(i2c_id, i2c.FAST) ~= 1 then
47
log.error("AirSHT30_1000.open", "i2c.setup error", i2c_id)
48
return false
49
end
50
return true
51
end
52
-- 读取温湿度数据;
53
-- 返回值:失败返回false;
54
-- 成功返回两个值,第一个为摄氏温度值(number类型,例如23.6表示23.6摄氏度),第二个为百分比湿度值(number类型,例如67表示67%的湿度)
55
function AirSHT30_1000.read()
56
-- 发送启动测量命令(高精度)
57
i2c.send(AirSHT30_1000.i2c_id, slave_addr, {0x24, 0x00})
58
-- 等待测量完成(SHT30高精度测量需~15ms)
59
sys.wait(20)
60
-- 读取6字节数据(温度高/低 + CRC,湿度高/低 + CRC)
61
local data = i2c.recv(AirSHT30_1000.i2c_id, slave_addr, 6)
62
-- 如果没有读取到6字节数据
63
if type(data)~="string" or data:len()~=6 then
64
log.error("AirSHT30_1000.read", "i2c.recv error")
65
return false
66
end
67
-- log.info("AirSHT30_1000.read", data:toHex())
68
--如果校验值正确
69
if crc8({data:byte(1), data:byte(2)}) == data:byte(3) and crc8({data:byte(4), data:byte(5)}) == data:byte(6) then
70
-- 提取原始温度值
71
local temp_raw = (data:byte(1) << 8) | data:byte(2)
72
-- 提取原始湿度值
73
local hum_raw = (data:byte(4) << 8) | data:byte(5)
74
-- 转换为实际值(根据SHT30数据手册公式)
75
local temprature = (-45 + 175 * temp_raw / 65535.0)
76
local humidity = (100 * hum_raw / 65535.0)
77
-- 打印输出结果(保留2位小数)
78
-- log.info("AirSHT30_1000.read", "temprature", string.format("%.2f ℃", temprature))
79
-- log.info("AirSHT30_1000.read", "temprature", string.format("%.2f %%RH", humidity))
80
return temprature, humidity
81
else
82
log.error("AirSHT30_1000.read", "crc error", i2c_id)
83
return false
84
end
85
end
86
-- 关闭AirSHT30_1000;
87
-- 返回值:成功返回true,失败返回false
88
function AirSHT30_1000.close()
89
--close接口没有返回值,理论上不会关闭失败
90
i2c.close(AirSHT30_1000.i2c_id)
91
return true
92
end
93
return AirSHT30_1000
3.4.5 app_sht30.lua
1
--[[
2
@module app_sht30
3
@summary 温湿度传感器应用模块
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本文件为温湿度传感器应用模块,核心业务逻辑为:
9
1、初始化AirSHT30_1000传感器;
10
2、每分钟读取一次温湿度数据;
11
3、读取4G模组的信号强度;
12
4、将数据打包成JSON格式并上传到MQTT服务器;
13
本文件没有对外接口,直接在main.lua中require "app_sht30"就可以加载运行;
14
]]
15
-- 加载配置文件
16
local config = require "config"
17
-- 加载AirSHT30_1000驱动
18
local AirSHT30_1000 = require "AirSHT30_1000"
19
-- 主题前缀:IMEI号
20
local TOPIC_PREFIX = mobile.imei()
21
-- 数据发送结果回调函数
22
local function send_data_cbfunc(result)
23
log.info("app_sht30", "send data result", result)
24
end
25
-- 读取并上传温湿度数据的任务函数
26
local function sht30_task()
27
-- 打开AirSHT30_1000传感器
28
local open_result = AirSHT30_1000.open(config.sht30.i2c_id)
29
if not open_result then
30
log.error("app_sht30", "AirSHT30_1000 open failed")
31
return
32
end
33
-- 启动MQTT长连接
34
sys.publish("MQTT_CLIENT_RUN_REQ", true)
35
while true do
36
-- 读取温湿度数据
37
local temperature, humidity = AirSHT30_1000.read()
38
if temperature and humidity then -- 读取4G信号强度 local csq = mobile.csq() -- 构建JSON数据 local data = { temperature = temperature, humidity = humidity, signal = csq, timestamp = os.time() } local json_data = json.encode(data) log.info("app_sht30", "read data", json_data) -- 发送数据到MQTT服务器 sys.publish("SEND_DATA_REQ", "sht30", TOPIC_PREFIX .. config.data.topic, json_data, config.mqtt.qos, {func = send_data_cbfunc})
39
else log.error("app_sht30", "read data failed")
40
end
41
-- 等待指定的上传间隔
42
sys.wait(config.data.upload_interval)
43
end
44
end
45
-- 启动温湿度传感器任务
46
sys.taskInit(sht30_task)
3.4.6 app_mqtt_main.lua
1
--[[
2
@module app_mqtt_main
3
@summary MQTT客户端主应用模块
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本文件为MQTT客户端主应用模块,核心业务逻辑为:
9
1、创建MQTT客户端,连接服务器;
10
2、处理连接/订阅/取消订阅/异常逻辑,出现异常后执行重连动作;
11
3、调用mqtt_receiver的外部接口mqtt_receiver.proc,对接收到的publish数据进行处理;
12
4、调用sys.sendMsg接口,发送"CONNECT OK"、"PUBLISH OK"和"DISCONNECTED"三种类型的"MQTT_EVENT"消息到mqtt_sender的task,控制publish数据发送逻辑;
13
5、收到MQTT心跳应答后,执行sys.publish("FEED_NETWORK_WATCHDOG") 对网络环境检测看门狗功能模块进行喂狗;
14
本文件没有对外接口,直接在main.lua中require "app_mqtt_main"就可以加载运行;
15
]]
16
-- 加载配置文件
17
local config = require "config"
18
-- 加载mqtt client数据接收功能模块
19
local mqtt_receiver = require "app_mqtt_receiver"
20
-- 加载mqtt client数据发送功能模块
21
local mqtt_sender = require "app_mqtt_sender"
22
-- mqtt服务器地址和端口
23
local SERVER_ADDR = config.mqtt.server_addr
24
local SERVER_PORT = config.mqtt.server_port
25
-- mqtt_main的任务名
26
local TASK_NAME = mqtt_sender.TASK_NAME_PREFIX.."main"
27
-- mqtt主题的前缀:IMEI号
28
local TOPIC_PREFIX = mobile.imei()
29
-- mqtt client的事件回调函数
30
local function mqtt_client_event_cbfunc(mqtt_client, event, data, payload, metas)
31
log.info("mqtt_client_event_cbfunc", mqtt_client, event, data, payload, json.encode(metas))
32
-- mqtt连接成功
33
if event == "conack" then
34
sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CONNECT", true)
35
-- 订阅单主题
36
-- 第二个参数表示qos,取值范围为0,1,2,如果不设置,默认为0
37
-- if not mqtt_client:subscribe(TOPIC_PREFIX .. "/down") then
38
-- sys.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", false, -1)
39
-- end
40
-- 订阅多主题,如果有需要,打开注释
41
-- 表中的每一个订阅主题的格式为[topic]=qos
42
-- if not mqtt_client:subscribe(
43
-- {
44
-- [(TOPIC_PREFIX .. "/data"]=0,
45
-- [(TOPIC_PREFIX .. "/cmd"]=1
46
-- }
47
-- ) then
48
-- sys.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", false, -1)
49
-- end
50
-- 订阅结果
51
-- data:订阅应答结果,true为成功,false为失败
52
-- payload:number类型;成功时表示qos,取值范围为0,1,2;失败时表示失败码,一般是0x80
53
elseif event == "suback" then
54
-- 发送消息通知 mqtt main task
55
sys.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", data, payload)
56
-- 取消订阅成功
57
elseif event == "unsuback" then
58
-- 发送消息通知 mqtt main task
59
sys.sendMsg(TASK_NAME, "MQTT_EVENT", "UNSUBSCRIBE", true)
60
-- 接收到服务器下发的publish数据
61
-- data:string类型,表示topic
62
-- payload:string类型,表示payload
63
-- metas:table类型,数据内容如下
64
-- {
65
-- qos: number类型,取值范围0,1,2
66
-- retain:number类型,取值范围0,1
67
-- dup:number类型,取值范围0,1
68
-- message_id: number类型
69
-- }
70
elseif event == "recv" then
71
-- 对接收到的publish数据处理
72
mqtt_receiver.proc(data, payload, metas)
73
-- 发送成功publish数据
74
-- data:number类型,表示message id
75
elseif event == "sent" then
76
-- 发送消息通知 mqtt sender task
77
sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "PUBLISH_OK", data)
78
-- 服务器断开mqtt连接
79
elseif event == "disconnect" then
80
-- 发送消息通知 mqtt main task
81
sys.sendMsg(TASK_NAME, "MQTT_EVENT", "DISCONNECTED", false)
82
-- 收到服务器的心跳应答
83
elseif event == "pong" then
84
-- 接收到数据,通知网络环境检测看门狗功能模块进行喂狗
85
sys.publish("FEED_NETWORK_WATCHDOG")
86
-- 严重异常,本地会主动断开连接
87
-- data:string类型,表示具体的异常,有以下几种:
88
-- "connect":tcp连接失败
89
-- "tx":数据发送失败
90
-- "conack":mqtt connect后,服务器应答CONNACK鉴权失败,失败码为payload(number类型)
91
-- "other":其他异常
92
elseif event == "error" then
93
if data == "connect" or data == "conack" then -- 发送消息通知 mqtt main task,连接失败 sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CONNECT", false)
94
elseif data == "other" or data == "tx" then -- 发送消息通知 mqtt main task,出现异常 sys.sendMsg(TASK_NAME, "MQTT_EVENT", "ERROR")
95
end
96
end
97
end
98
-- mqtt main task 的任务处理函数
99
-- auto_reconnect:短连接还是长连接;长连接为true,表示断开连接后会自动重连;短连接为false,表示断开连接后,不会自动重连
100
local function mqtt_client_main_task_func(auto_reconnect)
101
local mqtt_client
102
local result, msg
103
while true do
104
-- 如果当前时间点设置的默认网卡还没有连接成功,一直在这里循环等待
105
while not socket.adapter(socket.dft()) do log.warn("mqtt_client_main_task_func", "wait IP_READY", socket.dft()) -- 在此处阻塞等待默认网卡连接成功的消息"IP_READY" -- 或者等待1秒超时退出阻塞等待状态; -- 注意:此处的1000毫秒超时不要修改的更长; -- 因为当使用exnetif.set_priority_order配置多个网卡连接外网的优先级时,会隐式的修改默认使用的网卡 -- 当exnetif.set_priority_order的调用时序和此处的socket.adapter(socket.dft())判断时序有可能不匹配 -- 此处的1秒,能够保证,即使时序不匹配,也能1秒钟退出阻塞状态,再去判断socket.adapter(socket.dft()) sys.waitUntil("IP_READY", 1000)
106
end
107
-- 检测到了IP_READY消息
108
log.info("mqtt_client_main_task_func", "recv IP_READY", socket.dft())
109
-- 清空此task绑定的消息队列中的未处理的消息
110
sys.cleanMsg(TASK_NAME)
111
-- 创建mqtt client对象
112
mqtt_client = mqtt.create(nil, SERVER_ADDR, SERVER_PORT)
113
-- 如果创建mqtt client对象失败
114
if not mqtt_client then log.error("mqtt_client_main_task_func", "mqtt.create error") goto EXCEPTION_PROC
115
end
116
-- 配置mqtt client对象的client id,username,password和clean session标志
117
result = mqtt_client:auth(TASK_NAME..mobile.imei(), config.mqtt.username, config.mqtt.password, true)
118
-- 如果配置失败
119
if not result then log.error("mqtt_client_main_task_func", "mqtt_client:auth error") goto EXCEPTION_PROC
120
end
121
-- 注册mqtt client对象的事件回调函数
122
mqtt_client:on(mqtt_client_event_cbfunc)
123
-- 设置mqtt keepalive时间
124
mqtt_client:keepalive(config.mqtt.keepalive)
125
-- 设置遗嘱消息,有需要的话,可以打开注释
126
-- mqtt_client:will(TOPIC_PREFIX .. "/status", "offline")
127
-- 连接server
128
result = mqtt_client:connect()
129
-- 如果连接server失败
130
if not result then log.error("mqtt_client_main_task_func", "mqtt_client:connect error") goto EXCEPTION_PROC
131
end
132
-- 连接、断开连接、订阅、取消订阅、异常等各种事件的处理调度逻辑
133
while true do -- 等待"MQTT_EVENT"消息 msg = sys.waitMsg(TASK_NAME, "MQTT_EVENT") log.info("mqtt_client_main_task_func waitMsg", msg[2], msg[3], msg[4]) -- connect连接结果 -- msg[3]表示连接结果,true为连接成功,false为连接失败 if msg[2] == "CONNECT" then -- mqtt连接成功 if msg[3] then log.info("mqtt_client_main_task_func", "connect success") -- 通知mqtt sender数据发送应用模块的task,MQTT连接成功 sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "CONNECT_OK", mqtt_client) -- mqtt连接失败 else log.info("mqtt_client_main_task_func", "connect error") -- 退出循环,发起重连 break end -- subscribe订阅结果 -- msg[3]表示订阅结果,true为订阅成功,false为订阅失败 elseif msg[2] == "SUBSCRIBE" then -- 订阅成功 if msg[3] then log.info("mqtt_client_main_task_func", "subscribe success", "qos: "..(msg[4] or "nil")) -- 订阅失败 else log.error("mqtt_client_main_task_func", "subscribe error", "code", msg[4]) -- 主动断开mqtt client连接 mqtt_client:disconnect() -- 发送disconnect之后,此处延时1秒,给数据发送预留一点儿时间,发送到服务器; -- 即使1秒的时间不足以发送给服务器也没关系;对服务器来说,mqtt客户端只是没有优雅的断开,不影响什么实质功能; sys.wait(1000) break end -- unsubscribe取消订阅成功 elseif msg[2] == "UNSUBSCRIBE" then log.info("mqtt_client_main_task_func", "unsubscribe success") -- 需要主动关闭mqtt连接 -- 用户需要主动关闭mqtt连接时,可以调用sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CLOSE") elseif msg[2] == "CLOSE" then -- 主动断开mqtt client连接 mqtt_client:disconnect() -- 发送disconnect之后,此处延时1秒,给数据发送预留一点儿时间,发送到服务器; -- 即使1秒的时间不足以发送给服务器也没关系;对服务器来说,mqtt客户端只是没有优雅的断开,不影响什么实质功能; -- sys.wait(1000) break -- 被动关闭了mqtt连接 -- 被网络或者服务器断开了连接 elseif msg[2] == "DISCONNECTED" then break -- 出现了其他异常 elseif msg[2] == "ERROR" then break end
134
end
135
-- 出现异常
136
::EXCEPTION_PROC::
137
-- 清空此task绑定的消息队列中的未处理的消息
138
sys.cleanMsg(TASK_NAME)
139
-- 通知mqtt sender数据发送应用模块的task,MQTT连接已经断开
140
sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "DISCONNECTED")
141
-- 如果存在mqtt client对象
142
if mqtt_client then -- 关闭mqtt client,并且释放mqtt client对象 mqtt_client:close() mqtt_client = nil
143
end
144
-- 根据是否需要自动重连标志,决定后续执行的动作
145
if auto_reconnect then -- 5秒后跳转到循环体开始位置,自动发起重连 sys.wait(5000)
146
else sys.publish("MQTT_CLIENT_CLOSE_RSP") log.warn("mqtt_client_main_task_func", "exit") break
147
end
148
end
149
-- 清除此task对应的管理表资源
150
sys.taskDel(TASK_NAME)
151
end
152
local function mqtt_client_run_req(connection_type)
153
--创建并且启动一个task
154
--运行这个task的处理函数mqtt_client_main_task_func
155
sys.taskInitEx(mqtt_client_main_task_func, TASK_NAME, nil, connection_type)
156
end
157
local function mqtt_client_close_req()
158
sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CLOSE")
159
end
160
sys.subscribe("MQTT_CLIENT_RUN_REQ", mqtt_client_run_req)
161
sys.subscribe("MQTT_CLIENT_CLOSE_REQ", mqtt_client_close_req)
3.4.7 app_mqtt_sender.lua
1
--[[
2
@module app_mqtt_sender
3
@summary MQTT客户端数据发送应用模块
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本文件为MQTT客户端数据发送应用模块,核心业务逻辑为:
9
1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)订阅"SEND_DATA_REQ"消息,将其他应用模块需要发送的数据存储到队列send_queue中;
10
2、mqtt sender task接收"CONNECT OK"、"PUBLISH_REQ"、"PUBLISH OK"三种类型的"MQTT_EVENT"消息,遍历队列send_queue,逐条发送数据到server;
11
3、mqtt sender task接收"DISCONNECTED"类型的"MQTT_EVENT"消息,丢弃掉队列send_queue中未发送的数据;
12
4、任何一条数据无论发送成功还是失败,只要这条数据有回调函数,都会通过回调函数通知数据发送方;
13
本文件的对外接口有1个:
14
1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func):订阅"SEND_DATA_REQ"消息;
15
其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的topic,payload和qos以及回调函数和回调参数一起publish出去;
16
]]
17
local mqtt_sender = {}
18
--[[
19
数据发送队列,数据结构为:
20
{
21
[1] = {topic="topic1", payload="payload1", qos=0, cb={func=callback_function1, para=callback_para1}},
22
[2] = {topic="topic2", payload="payload2", qos=1, cb={func=callback_function2, para=callback_para2}},
23
[3] = {topic="topic3", payload="payload3", qos=2, cb={func=callback_function3, para=callback_para3}},
24
}
25
topic的内容为publish的主题,string类型,必须存在;
26
payload的内容为publish的负载数据,string类型,必须存在;
27
qos的内容为publish的质量等级,number类型,取值范围0,1,2,可选,如果用户没有指定,默认为0;
28
cb.func的内容为数据发送结果的用户回调函数,可以不存在;
29
cb.para的内容为数据发送结果的用户回调函数的回调参数,可以不存在;
30
]]
31
local send_queue = {}
32
-- mqtt client的任务名前缀
33
mqtt_sender.TASK_NAME_PREFIX = "mqtt_"
34
-- mqtt_client_sender的任务名
35
mqtt_sender.TASK_NAME = mqtt_sender.TASK_NAME_PREFIX.."sender"
36
-- "SEND_DATA_REQ"消息的处理函数
37
local function send_data_req_proc_func(tag, topic, payload, qos, cb)
38
-- 将原始数据增加前缀,然后插入到发送队列send_queue中
39
table.insert(send_queue, {topic=topic, payload=payload, qos=qos or 0, cb=cb})
40
-- 发送消息通知 mqtt sender task,有新数据等待发送
41
sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "PUBLISH_REQ")
42
end
43
-- 按照顺序发送send_queue中的数据
44
-- 如果调用publish接口成功,则返回当前正在发送的数据项
45
-- 如果调用publish接口失败,通知回调函数发送失败后,继续发送下一条数据
46
local function publish_item(mqtt_client)
47
local item
48
-- 如果发送队列中有数据等待发送
49
while #send_queue>0 do
50
-- 取出来第一条数据赋值给item
51
-- 同时从队列send_queue中删除这一条数据
52
item = table.remove(send_queue, 1)
53
-- publish数据
54
-- result表示调用publish接口的同步结果,返回值有以下几种:
55
-- 如果失败,返回nil
56
-- 如果成功,number类型,qos为0时直接返回0;qos为1或者2时返回publish报文的message id
57
result = mqtt_client:publish(item.topic, item.payload, item.qos)
58
-- publish接口调用成功
59
if result then return item
60
-- publish接口调用失败
61
else -- 如果当前发送的数据有用户回调函数,则执行用户回调函数 if item.cb and item.cb.func then item.cb.func(false, item.cb.para) end
62
end
63
end
64
end
65
local function publish_item_cbfunc(item, result)
66
if item then
67
-- 如果当前发送的数据有用户回调函数,则执行用户回调函数
68
if item.cb and item.cb.func then item.cb.func(result, item.cb.para)
69
end
70
end
71
end
72
-- mqtt client sender的任务处理函数
73
local function mqtt_client_sender_task_func()
74
local mqtt_client
75
local send_item
76
local result, msg
77
while true do
78
-- 等待"MQTT_EVENT"消息
79
msg = sys.waitMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT")
80
-- mqtt连接成功
81
-- msg[3]表示mqtt client对象
82
if msg[2] == "CONNECT_OK" then mqtt_client = msg[3] -- 发送send_queue中的数据 send_item = publish_item(mqtt_client)
83
-- mqtt publish数据请求
84
elseif msg[2] == "PUBLISH_REQ" then -- 如果mqtt client对象存在,并且没有正在等待发送结果的发送数据项 if mqtt_client and not send_item then -- 发送send_queue中的数据 send_item = publish_item(mqtt_client) end
85
-- mqtt publish数据成功
86
elseif msg[2] == "PUBLISH_OK" then -- publish成功,执行回调函数通知发送方 publish_item_cbfunc(send_item, true) -- publish成功,通知网络环境检测看门狗功能模块进行喂狗 sys.publish("FEED_NETWORK_WATCHDOG") -- 发送send_queue中的数据 send_item = publish_item(mqtt_client)
87
-- mqtt断开连接
88
elseif msg[2] == "DISCONNECTED" then -- 清空mqtt client对象 mqtt_client = nil -- 如果存在正在等待发送结果的发送项,执行回调函数通知发送方失败 publish_item_cbfunc(send_item, false) -- 如果发送队列中有数据等待发送 while #send_queue>0 do -- 取出来第一条数据赋值给send_item -- 同时从队列send_queue中删除这一条数据 send_item = table.remove(send_queue,1) -- 执行回调函数通知发送方失败 publish_item_cbfunc(send_item, false) end -- 当前没有正在等待发送结果的发送项 send_item = nil
89
end
90
end
91
end
92
-- 订阅"SEND_DATA_REQ"消息;
93
-- 其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的数据以及回调函数和回调参数一起publish出去;
94
sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)
95
--创建并且启动一个task
96
--运行这个task的处理函数mqtt_client_sender_task_func
97
sys.taskInitEx(mqtt_client_sender_task_func, mqtt_sender.TASK_NAME)
98
return mqtt_sender
3.4.8 app_mqtt_receiver.lua
1
--[[
2
@module app_mqtt_receiver
3
@summary MQTT客户端数据接收应用模块
4
@version 1.0
5
@date 2026.03.15
6
@author LuatOS
7
@usage
8
本文件为MQTT客户端数据接收应用模块,核心业务逻辑为:
9
1、提供mqtt_receiver.proc接口,对接收到的publish数据进行处理;
10
本文件的对外接口有1个:
11
1、mqtt_receiver.proc(data, payload, metas):处理接收到的publish数据;
12
data:string类型,表示topic;
13
payload:string类型,表示payload;
14
metas:table类型,表示消息的元数据;
15
]]
16
local mqtt_receiver = {}
17
-- 处理接收到的publish数据
18
-- data:string类型,表示topic
19
-- payload:string类型,表示payload
20
-- metas:table类型,数据内容如下
21
-- {
22
-- qos: number类型,取值范围0,1,2
23
-- retain:number类型,取值范围0,1
24
-- dup:number类型,取值范围0,1
25
-- message_id: number类型
26
-- }
27
function mqtt_receiver.proc(data, payload, metas)
28
-- 打印接收到的数据
29
log.info("mqtt_receiver.proc", "topic:", data, "payload:", payload, "metas:", json.encode(metas))
30
-- 这里可以根据实际需求处理接收到的数据
31
-- 例如:解析命令、控制设备等
32
end
33
return mqtt_receiver
本文详细说明如何使用 luatos-docs-code 开发 MQTT 温湿度上传项目,硬件采用 Air780EPM,软件具备稳定长连接、自动重连、定时采样上传能力。代码分为主入口与功能模块,配置规范、目录统一,可直接用于学习与二次开发。
今天的内容分享到这里了