mqtt_client: ^10.2.0
class MqttWebTool {
MqttQos qos = MqttQos.atLeastOnce;
late MqttBrowserClient mqttClient;
static MqttWebTool? _instance;
// 记录客户端是否正在连接
static bool isConnecting = false;
static MqttWebTool? getInstance() {
_instance ??= MqttWebTool();
return _instance;
}
Future<MqttClientConnectionStatus?> connect(String server, int port,
String clientIdentifier, String username, String password,
{bool isSsl = false}) {
mqttClient = MqttBrowserClient.withPort(
"ws://$server:8083/mqtt", clientIdentifier, 8083);
//设置心跳
mqttClient.keepAlivePeriod = 20;
// 设置自动重连和重连时间间隔
mqttClient.autoReconnect = true;
// 注册onAutoReconnect回调函数
mqttClient.onAutoReconnect = () {
if (!isConnecting) {
if (mqttClient.connectionStatus!.state ==
MqttConnectionState.disconnected) {
LogUtils.println('正在尝试重新连接');
mqttClient.connect().then((value) => isConnecting = true);
} else {
isConnecting = false;
}
}
};
mqttClient.onConnected = onConnected;
mqttClient.onDisconnected = onDisconnected;
mqttClient.onSubscribed = _onSubscribed;
mqttClient.onSubscribeFail = _onSubscribeFail;
mqttClient.onUnsubscribed = _onUnSubscribed;
var mqttConnectMessage = MqttConnectMessage()
.authenticateAs(username, password)
.withWillTopic('willTopic')
.withWillMessage('Will message')
.startClean();
mqttClient.connectionMessage = mqttConnectMessage;
mqttClient.pongCallback = pong; //收到 PING 响应回调
// mqttClient.logging(on: true);
LogUtils.println("_正在连接中...");
return mqttClient.connect();
}
//断开链接
disconnect() {
mqttClient.autoReconnect = false;
mqttClient.disconnect();
LogUtils.println("_disconnect");
}
void onDisconnected() {
LogUtils.println('已断开连接,正在重新连接...');
isConnecting = false;
}
/// todo: 收到 PING 响应
void pong() {}
//发送消息
int publishMessage(String pTopic, String msg) {
Uint8Buffer uint8buffer = Uint8Buffer();
var codeUnits = msg.codeUnits;
uint8buffer.addAll(codeUnits);
return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
}
//MQTT状态
MqttConnectionState mqttCurrentStatus() {
return mqttClient == null
? MqttConnectionState.disconnected
: mqttClient.connectionStatus!.state;
}
Subscription? subscribeMessage(String subtopic) {
return mqttClient.subscribe(subtopic, qos);
}
void unsubscribeMessage(String? unSubtopic) {
mqttClient.unsubscribe(unSubtopic!);
}
///得到mqtt状态
MqttClientConnectionStatus? getMqttStatus() {
return mqttClient.connectionStatus;
}
Stream<List<MqttReceivedMessage<MqttMessage>>>? updates() {
LogUtils.println("_监听成功!");
return mqttClient.updates;
}
///链接
onConnected() {
LogUtils.println("链接上了");
}
//取消订阅
unSubscribeTopic() {
String topic = "*********";
unsubscribeMessage(topic);
}
///订阅成功
_onSubscribed(String topic) {
LogUtils.println("_订阅主题成功---topic:$topic");
}
///取消订阅主题成功
_onUnSubscribed(String? topic) {
LogUtils.println("_取消订阅主题成功---topic:$topic");
}
///订阅失败
_onSubscribeFail(String topic) {
LogUtils.println("_onSubscribeFail");
}
}
参考文章:
https://blog.csdn.net/ZQ200720/article/details/139065097