官方文档
https://www.emqx.com/zh/blog/how-to-use-mqtt-in-golang
连接
var once sync.Once
var MqttClient mqtt.Client
var MqttMessage mqtt.Message
func ConnectMqtt(pact, broker, port, username, password string) {
once.Do(func() {
MqttClient = NewClient(pact, broker, port, username, password)
})
}
func NewClient(pact, broker, port, username, password string) mqtt.Client {
//取配置
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("%s://%s:%s", pact, broker, port))
//todo 如果使用固定的clientID会出现在第一次重连成功后,一直断开再重连再断开的死循环
//处理这个问题的方法就是每次重新连接的时候使用新的clientId
u1 := uuid.New()
clientId := u1.String()
opts.SetClientID(clientId)
opts.SetUsername(username)
opts.SetPassword(password)
//Keep Alive 的最大值为 18 小时 12 分 15 秒;
opts.SetKeepAlive(18 * time.Hour)
opts.SetDefaultPublishHandler(MessagePubHandler)
//把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
//当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
//但是这个只是进行了重连,重连后还需要再次发起订阅
opts.CleanSession = false
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
return client
}
var MessagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
//实现数据存储或者入库
logger.ZapLogger.Infof("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
// 连接回调
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected Ok")
logger.ZapLogger.Infof("MQTT Connect SUCCESS")
}
// 丢失回调
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost :%v", err)
logger.ZapLogger.Errorf("Connect lost :%v", err)
}
发送数据和订阅接收
type MqttInterface interface {
//发送
PublishMessage(msg any)
//订阅
Subscription()
}
type MqttC struct {
Topic string
MqttMessage mqtt.Message
}
func NewMqttCRes(topic string) *MqttC {
return &MqttC{
Topic: topic,
}
}
func (m *MqttC) PublishMessage(msg any) {
//等级:0 消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
//1消息至少传送一次。
//2消息只传送一次。
qos := byte(1)
retained := false
token := MqttClient.Publish(m.Topic, qos, retained, msg)
if token.Error() != nil {
fmt.Printf("Error while publishing %v", token.Error())
}
token.Wait()
}
func (m *MqttC) Subscription() {
qos := byte(0)
token := MqttClient.Subscribe(m.Topic, qos, MessagePubHandler)
token.Wait()
fmt.Printf("Subscribed to topic: %s", m.Topic)
}