Centos7搭建Mosquitto,Golang使用MQTT

介绍

MQTT是一种机器到机器消息协议,旨在为“物联网”设备提供轻量级发布/订阅通信。它通常用于车辆的地理跟踪车队,家庭自动化,环境传感器网络和公用事业规模的数据收集。
Mosquitto是一个实现了MQTT3.1协议的MQTT服务器(或代理 ,在MQTT中的用法),具有良好的社区支持,易于安装和配置。
在我的应用场景中,云服务器和食堂消费终端需要数据交互,终端拉取服务器数据使用HTTP,服务器通知终端使用MQTT。 本文的主旨在于记录Mosquitto服务的安装和使用、基于Golang实现发布订阅,以备日后查阅。

一、服务器安装Mosquitto

服务器 CentOS Linux release 7.3.1611 (Core)
检查系统是否已安装EPEL(Extra Packages for Enterprise Linux)软件源。

检查安装epel源

检查 epel 是否已安装

// 如下已存在epel源,则不需要安装
➜ yum repolist | grep epel
!epel/x86_64               Extra Packages for Enterprise Linux 7 - x86_64 13,242

如果不存在,则需要安装 epel

➜ yum -y install epel-release

安装mosquitto

➜ yum -y install mosquitto

启动mosquitto

// 启动
➜ systemctl start mosquitto
// 停止
➜ systemctl stop mosquitto
// 重新启动
➜ systemctl restart mosquitto
// 查看运行状态 
➜ systemctl status mosquitto
// 设为开机启动
➜ systemctl enable mosquitto

测试Mosquitto

开启两个终端窗口,一个负责订阅,另一个负责发布。


// 订阅
➜ mosquitto_sub -h localhost -t test

// 发布
➜ mosquitto_pub -h localhost -t test -m "hello world"
  • -h host, 服务器的地址
  • -t topic,发布的主题,订阅的主题和发布的一致才会收到信息
  • -m message, 消息内容

mosquitto_pubmosquitto_sub 参数详解可以使用 man命令查看,或访问官方地址: mosquitto_pub命令详解 mosquitto_sub命令详解

修改Mosquitto配置

配置文件位置/etc/mosquitto/mosquitto.conf,默认无密码可以访问,端口号为1883。设置一下用户名和密码,mqtt 才会比较安全。
原配置文件内容过多,为方便查看,新建一个空白文件代替默认的。

  • 新建用户名密码
// uname1 为你的用户名
➜ mosquitto_passwd -c /etc/mosquitto/passwd username1

// 如果要添加多个用户使用 -b 参数 
// 必须在控制台输入明文的密码,且文件不会覆盖之前的
➜ mosquitto_passwd -b /etc/mosquitto/passwd username2 password2
  • 备份默认配置
➜ mv /etc/mosquitto/mosquitto.conf /etc/mosquitto/mosquitto.conf.example
  • 新建文件 vim /etc/mosquitto/mosquitto.conf,写入如下内容
# 禁止匿名访问
allow_anonymous false
# 用户及密码存储文件
password_file /etc/mosquitto/passwd
# 端口号
port 8810

重启测试

  • 重启mosquitto
➜ systemctl restart mosquitto
  • 不带用户名和密码进行测试
// 指定端口号8810
// 不带用户名和密码,访问被拒绝
➜ mosquitto_pub -h localhost -t test -p 8810 -m "hello world"
Connection error: Connection Refused: not authorised.
  • 使用用户名和密码测试发布订阅

在一个窗口执行订阅:

// -u 指定用户名
// -P 指定密码
➜ mosquitto_sub -h localhost -t test -p 8810  -u 'username1' -P 'password1'

在另一个终端窗口发布消息:

➜ mosquitto_pub -h localhost -t test -p 8810  -u 'username1' -P 'password1' -m 'hello world'

防火墙开放端口

服务器使用的firewall,如果需要远程测试,需要打开mosquitto端口

// 查看所有打开的端口
➜ firewall-cmd --zone=public --list-ports
// 打开8810端口
➜ firewall-cmd --zone=public --add-port=8810/tcp --permanent
// 重启防火墙
➜ firewall-cmd --reload

二、Golang发布订阅MQTT

项目目录结构:

|____mqtt
| |____mqtt.go
|____mqtt_pub.go
|____mqtt_sub.go
|____service
| |____lib.go

封装mqtt

使用包 github.com/eclipse/paho.mqtt.golang 封装,修改其中的配置 Host、UserName、Password
新建文件mqtt/mqtt.go

package mqtt

import (
    "encoding/json"
    "errors"
    "fmt"
    gomqtt "github.com/eclipse/paho.mqtt.golang"
    "strings"
    "sync"
    "time"
)

// mqtt服务器配置
const (
    Host     = "127.0.0.1:8810"
    UserName = "test"
    Password = "123456"
)

type Client struct {
    nativeClient  gomqtt.Client
    clientOptions *gomqtt.ClientOptions
    locker        *sync.Mutex
    // 消息收到之后处理函数
    observer func(c *Client, msg *Message)
}

type Message struct {
    // client_id
    ClientID string `json:"client_id"`
    // 接口名,订阅号通过识别接口名处理相应业务
    Action string `json:"action"`
    // 数据类型
    Type string `json:"type"`
    // 发布时间
    Time int64 `json:"time"`
    // 业务数据的header,可以携带一些系统参数
    Header interface{} `json:"header"`
    // 业务数据的body,业务参数
    Body interface{} `json:"body"`
}

func NewClient(clientId string) *Client {
    clientOptions := gomqtt.NewClientOptions().
        AddBroker(Host).
        SetUsername(UserName).
        SetPassword(Password).
        SetClientID(clientId).
        SetCleanSession(false).
        SetAutoReconnect(true).
        SetKeepAlive(120 * time.Second).
        SetPingTimeout(10 * time.Second).
        SetWriteTimeout(10 * time.Second).
        SetOnConnectHandler(func(client gomqtt.Client) {
            // 连接被建立后的回调函数
            fmt.Println("Mqtt is connected!", "clientId", clientId)
        }).
        SetConnectionLostHandler(func(client gomqtt.Client, err error) {
            // 连接被关闭后的回调函数
            fmt.Println("Mqtt is disconnected!", "clientId", clientId, "reason", err.Error())
        })

    nativeClient := gomqtt.NewClient(clientOptions)

    return &Client{
        nativeClient:  nativeClient,
        clientOptions: clientOptions,
        locker:        &sync.Mutex{},
    }
}

func (client *Client) GetClientID() string {
    return client.clientOptions.ClientID
}

func (client *Client) Connect() error {
    return client.ensureConnected()
}

// 确保连接
func (client *Client) ensureConnected() error {
    if !client.nativeClient.IsConnected() {
        client.locker.Lock()
        defer client.locker.Unlock()
        if !client.nativeClient.IsConnected() {
            if token := client.nativeClient.Connect(); token.Wait() && token.Error() != nil {
                return token.Error()
            }
        }
    }
    return nil
}

// 发布消息
// retained: 是否保留信息
func (client *Client) Publish(topic string, qos byte, retained bool, data []byte) error {
    if err := client.ensureConnected(); err != nil {
        return err
    }

    token := client.nativeClient.Publish(topic, qos, retained, data)
    if err := token.Error(); err != nil {
        return err
    }

    // return false is the timeout occurred
    if !token.WaitTimeout(time.Second * 10) {
        return errors.New("mqtt publish wait timeout")
    }

    return nil
}

// 消费消息
func (client *Client) Subscribe(observer func(c *Client, msg *Message), qos byte, topics ...string) error {
    if len(topics) == 0 {
        return errors.New("the topic is empty")
    }

    if observer == nil {
        return errors.New("the observer func is nil")
    }

    if client.observer != nil {
        return errors.New("an existing observer subscribed on this client, you must unsubscribe it before you subscribe a new observer")
    }
    client.observer = observer

    filters := make(map[string]byte)
    for _, topic := range topics {
        filters[topic] = qos
    }
    client.nativeClient.SubscribeMultiple(filters, client.messageHandler)

    return nil
}

func (client *Client) messageHandler(c gomqtt.Client, msg gomqtt.Message) {
    if client.observer == nil {
        fmt.Println("not subscribe message observer")
        return
    }
    message, err := decodeMessage(msg.Payload())
    if err != nil {
        fmt.Println("failed to decode message")
        return
    }
    client.observer(client, message)
}

func decodeMessage(payload []byte) (*Message, error) {
    message := new(Message)
    decoder := json.NewDecoder(strings.NewReader(string(payload)))
    decoder.UseNumber()
    if err := decoder.Decode(&message); err != nil {
        return nil, err
    }
    return message, nil
}

func (client *Client) Unsubscribe(topics ...string) {
    client.observer = nil
    client.nativeClient.Unsubscribe(topics...)
}

map解析成struct

基于包github.com/goinggo/mapstructure 将订阅者收到的map解析为相应的struct。
新建文件 service/lib.go

package service

import "github.com/goinggo/mapstructure"

// User 一个用于测试的struct
type User struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// MapToStruct map转struct.
// mqtt解析字段直接使用项目通用的json标签
// mqtt传输int会转为string(int),需要开启 WeaklyTypedInput
func MapToStruct(m interface{}, structPointer interface{}) error {
    // https://godoc.org/github.com/mitchellh/mapstructure#DecoderConfig
    config := &mapstructure.DecoderConfig{
        TagName:          "json",
        Result:           structPointer,
        WeaklyTypedInput: true,
    }
    newDecode, _ := mapstructure.NewDecoder(config)

    if err := newDecode.Decode(m); err != nil {
        return err
    }
    return nil
}

发布者

新建文件 mqtt_pub.go

package main

import (
    "encoding/json"
    "github.com/astaxie/beego/logs"
    "github.com/gushasha/go-mqtt/mqtt"
    "github.com/gushasha/go-mqtt/service"
    "time"
)

func main() {

    const (
        clientId = "pub-001"
        // topic规则:设备编号/接口名
        topicName      = "device001/user"
        actionNameUser = "user/detail"
    )

    client := mqtt.NewClient(clientId)
    err := client.Connect()
    if err != nil {
        logs.Error(err.Error())
    }

    // 发布一个 user 消息
    body := service.User{1, "小宝", 2}

    msg := &mqtt.Message{
        ClientID: clientId,
        Action:   actionNameUser,
        Type:     "json",
        Time:     time.Now().Unix(),
        Body:     body,
    }
    data, _ := json.Marshal(msg)
    err = client.Publish(topicName, 0, false, data)
    if err != nil {
        panic(err)
    }
}

订阅者

新建文件 mqtt_sub.go

package main

import (
    "fmt"
    "github.com/gushasha/go-mqtt/mqtt"
    "github.com/gushasha/go-mqtt/service"
    "sync"
)

var (
    wg sync.WaitGroup
)

const (
    clientId = "client-001"
    topicName      = "device001/#"
    actionNameUser = "user/detail"
)

func main() {

    client := mqtt.NewClient(clientId)
    err := client.Connect()
    if err != nil {
        panic(err.Error())
    }

    wg.Add(1)
    err = client.Subscribe(userHandler, 0, topicName)
    if err != nil {
        panic(err)
    }
    wg.Wait()
}

var userHandler = func(c *mqtt.Client, msg *mqtt.Message) {
    switch msg.Action {
    case actionNameUser:
        // map 转 service.User
        user := &service.User{}
        err := service.MapToStruct(msg.Body, user);
        if err != nil {
            fmt.Println("error:", err)
        } else {
            HandleUser(user)
        }
    default:
        fmt.Printf("unkonwn action %s \n", msg.Action)
    }
}

func HandleUser(user *service.User) {
    fmt.Printf("user: %#v \n", user)
}

执行测试

// 在一个终端窗口执行订阅
➜ go run mqtt_sub.go

// 另一个终端窗口执行发布
➜ go run mqtt_pub.go

// 订阅端收到信息,解析为struct:
// mqttuser: &service.User{ID:1, Name:"小宝", Age:2}

参考资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,383评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,522评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,852评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,621评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,741评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,929评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,076评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,803评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,265评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,582评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,716评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,395评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,039评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,027评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,488评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,612评论 2 350