基于MQTT协议实现Broker

写在前面:

前一篇文字<<基于MQTT协议谈谈物联网开发>>主要叙述了MQTT协议的编解码以及基于MQTT协议的一些常见应用场景,并以一个简单的消息推送系统作为例子阐述具体MQTT应用的开发,这篇文字继续叙述上述应用中Mqtt Broker部分的实现.

1.Mqtt Broker开源产品:

Mqtt Broker简单来说就是基于MQTT协议的Server实现,并且github上有很多开源实现,如下图:


mqtt-broker开源代码实现

各种Mqtt Broker开源实现的几个关键技术指标对比如下图:


broker技术指标

可以根据业务的需求不同选用不同的broker开源实现,总体来说,对Mqtt Broker来说,要求高可用,无状态,易扩容,可靠传输等,下面就以Mqtt Broker的实现作为例子,讲讲基于MQTT协议的Broker具体开发,不考虑太复杂的业务逻辑,仅以最简洁的方式,阐述整个流程,基于Golang和RabbitMQ开发.

2.Mqtt Broker具体实现:

2.1Mqtt Broker架构草图:

broker架构草图

2.2Mqtt Broker实现细节说明:

(1)Broker与RabbitMQ建立TCP连接,实现断线重连逻辑以及断线通知机制;
(2)Broker声明与RabbitMQ相关的exchanger, amqp队列,以及rpc队列;
(3)Broker监听APP端mqttclient的tcp连接,对应每个连接,Broker实例化一个appClient;
(4)每个appClient处理与之对应的APP业务请求,包括发布,订阅,取消订阅,心跳等;
(5)每个appClient获取一个channel,通过channel与RabbitMQ进行通信;
(6)若Broker与RabbitMQ发生断连,则channel不可用,必须等待重连成功,然后重新获取channel;
(7)若RabbitMQ发生重启,之前对应APP需要订阅的数据,每个appClient必须重新订阅;
(8)若APP断开连接,则对应的appClient必须释放所有资源并结束生命周期,不再服务;
(9)其他...

2.3Mqtt Broker代码实现:

根据上述的架构草图以及细节说明,就整个架构比较关键的几个环节,用具体代码实现加以阐述,以求透彻.
(1)Broker与RabbitMQ连接:
定义相关数据结构以及方法如下:

package rabbit

var (
    RabbitServerAddr = flag.String("rabbitserveraddr", "amqp://guest:guest@localhost:5672", "")
)
const (
    CONNECT_INTERVAL = 3
)
var _rabbit *Rabbit = nil
var _once sync.Once
//定义Rabbit结构
type Rabbit struct {
    wg                  *sync.WaitGroup
    rabbitconn          *amqp.Connection
    rabbitconnListeners map[string]chan bool
    _mutex              sync.Mutex
}
//获取Rabbit单例
func GetRabbitInstance(_wg *sync.WaitGroup, listenerID string, rabbitconnChan chan bool) *Rabbit {
    _once.Do(func() {
        _rabbit = &Rabbit{
            wg: _wg,
        }
        _rabbit.rabbitconnListeners = make(map[string]chan bool)
        _rabbit.initConn()
    })
    if listenerID != "" && rabbitconnChan != nil {
        _rabbit._mutex.Lock()
        _rabbit.rabbitconnListeners[listenerID] = rabbitconnChan
        _rabbit._mutex.Unlock()
    }

    return _rabbit
}
//建立RabbitMQ连接
func (r *Rabbit) initConn() {
    if r.rabbitconn != nil {
        return
    }
    conn, err := amqp.Dial(*RabbitServerAddr)
    if err != nil {
        time.Sleep(time.Second * CONNECT_INTERVAL)
        U.FailOnError(err)
    }
    r.rabbitconn = conn
    r.wg.Add(1)
    go r.checkConn()
}
//RabbitMQ断线重连
func (r *Rabbit) checkConn() {
    defer r.wg.Done()
    for {
        <-r.rabbitconn.NotifyClose(make(chan *amqp.Error))
        U.GetLog().Printf("r.conn.NotifyClose")
        r.broadcastConnStatus(false)
        for {
            conn, err := amqp.Dial(*RabbitServerAddr)
            if err == nil {
                r.rabbitconn = conn
                r.broadcastConnStatus(true)
                break
            }
            U.GetLog().Printf("amqp.Dial failed")
            r.broadcastConnStatus(false)
            time.Sleep(time.Second * CONNECT_INTERVAL)
        }
    }
}

(2)Broker声明exchanger, amqp队列,以及rpc队列:
定义相关数据结构以及方法如下:

package mqtt

type TOPIC_CHAN_TYPE map[string]chan ExchangeMessage
//定义BrokerExchanger结构
type BrokerExchanger struct {
    wg               *sync.WaitGroup
    channel          *amqp.Channel
    exchangerChan    chan *MqttClientCmd
    amqpqueueName    string
    amqpDeliveryChan <-chan amqp.Delivery
    amqpTopicChan    map[string]TOPIC_CHAN_TYPE
    rpcqueueName     string
    rpcDeliveryChan  <-chan amqp.Delivery
    rpcClientChan    map[string]chan ExchangeMessage
}
//实例化BrokerExchanger
func NewBrokerExchanger(_wg *sync.WaitGroup) *BrokerExchanger {
    _be := &BrokerExchanger{
        wg: _wg,
    }
    _be.once()
    return _be
}
//声明Exchange,区分amqp与rpc类型
func (be *BrokerExchanger) declareExchange(exctype string) error {
    if be.channel == nil {
        return fmt.Errorf("[BrokerExchanger::declareExchange] channel not ready")
    }
    var defaultExchange string
    var exchangeType string
    switch exctype {
    case defaultAMQPExchange:
        defaultExchange = defaultAMQPExchange
        exchangeType = "topic"
    case defaultRPCExchange:
        defaultExchange = defaultRPCExchange
        exchangeType = "direct"
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::declareExchange] unexpected exchange type:%s", exctype)
    }
    err := be.channel.ExchangeDeclare(
        defaultExchange, // name
        exchangeType,    // type
        true,            // durable
        false,           // auto-deleted
        false,           // internal
        false,           // no-wait
        nil,             // arguments
    )
    return err
}
//声明Queue,区分amqp与rpc类型
func (be *BrokerExchanger) declareQueue(exctype string) error {
    if be.channel == nil {
        return fmt.Errorf("[BrokerExchanger::declareQueue] channel not ready")
    }
    q, err := be.channel.QueueDeclare(
        "",    // name
        true,  // durable
        true,  // delete when usused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        return err
    }

    switch exctype {
    case defaultAMQPExchange:
        be.amqpqueueName = q.Name
    case defaultRPCExchange:
        be.rpcqueueName = q.Name
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::declareQueue] unexpected exchange type:%s", exctype)
    }
    return nil
}
//绑定Queue,区分amqp与rpc类型
func (be *BrokerExchanger) queueBind(exctype string) error {
    var queueName string
    switch exctype {
    case defaultAMQPExchange:
        return nil
    case defaultRPCExchange:
        queueName = be.rpcqueueName
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::queueBind] unexpected exchange type:%s", exctype)
    }

    err := be.channel.QueueBind(
        queueName,          // queue name
        queueName,          // routing key
        defaultRPCExchange, // exchange
        false,
        nil,
    )

    return err
}
//消费Queue,区分amqp与rpc类型
func (be *BrokerExchanger) consume(exctype string) error {
    var queueName string
    switch exctype {
    case defaultAMQPExchange:
        queueName = be.amqpqueueName
    case defaultRPCExchange:
        queueName = be.rpcqueueName
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype)
    }
    deliveryChan, err := be.channel.Consume(
        queueName, // queue
        "",        // consumer
        true,      // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if err != nil {
        return err
    }
    switch exctype {
    case defaultAMQPExchange:
        be.amqpDeliveryChan = deliveryChan
    case defaultRPCExchange:
        be.rpcDeliveryChan = deliveryChan
    default:
        U.GetLog().Printf("unexpected exchange type:%s", exctype)
        return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype)
    }
    return nil
}

说明:amqp发布订阅类型与rpc类型,在声明exchange时,exchange type字段不同,一个是topic,一个direct;

(3)MqttBroker定义及实现:
定义相关数据结构以及方法如下:

package mqtt
//定义MqttBroker结构
type MqttBroker struct {
    wg                 *sync.WaitGroup
    addr               string
    serverid           string
    rabbit             *rabbit.Rabbit
    channel            *amqp.Channel
    brokerexchanger    *BrokerExchanger
    mqttconnChan       chan net.Conn
    rabbitconnChan     chan bool
    connListeners      map[string]chan bool
    mqttClientChan     chan *MqttClientCmd
    clientMap          map[string]*MqttClient
    Authenticate       AuthenticateFunc
    AuthorizePublish   AuthorizePublishFunc
    AuthorizeSubscribe AuthorizeSubscribeFunc
}

//监听RabbitMQ连接状态
func (mb *MqttBroker) handleRabbitConnChan() {
    defer mb.wg.Done()
    for connStatus := range mb.rabbitconnChan {
        U.GetLog().Printf("serverid:%s, rabbitmq connection status:%v", mb.serverid, connStatus)
        if !connStatus {
            mb.reset()
        }
        if connStatus {
            mb.rabbitConnSuccessCallback()
        }
    }
}

//初始化Broker Exchanger
func (mb *MqttBroker) rabbitConnSuccessCallback() error {
    err := mb.initChannel()
    if err != nil {
        U.GetLog().Printf("initChannel error:%v", err)
        return err
    }
    err = mb.initBrokerExchanger()
    if err != nil {
        U.GetLog().Printf("mb.brokerexchanger.Init error:%v", err)
        return err
    }

    mb.broadcastRabbitConnStatus(true)
    return err
}
//监听APP连接
func (mb *MqttBroker) ListenAndServe() {
    defer mb.wg.Done()

    var listener net.Listener
    var err error
    listener, err = net.Listen("tcp", mb.addr)
    U.FailOnError(err)

    U.GetLog().Printf("listen and serve mqtt broker on %s", mb.addr)
    for {
        conn, err := listener.Accept()
        if err != nil {
            U.GetLog().Printf("accepting new connection error:%v", err)
            continue
        }
        mb.wg.Add(1)
        mb.mqttconnChan <- conn
    }
}
//针对每个APP连接,实例化MqttClient处理相关业务数据
func (mb *MqttBroker) handleMqttConnection() {
    defer mb.wg.Done()
    for conn := range mb.mqttconnChan {
        mqttclient, err := NewMqttClient(mb.wg, conn, mb)
        if err != nil {
            U.GetLog().Printf("NewMqttClient error:%v", err)
            continue
        }
        mb.clientMap[mqttclient.GetClientID()] = mqttclient
        mb.wg.Add(1)
        go mqttclient.Serve()
    }
}

(4)MqttClient定义及实现:
定义相关数据结构以及方法如下:

package mqtt
//定义MqttClient结构
type MqttClient struct {
    wg             *sync.WaitGroup
    tcpconn        net.Conn
    broker         *MqttBroker
    keepalive      int
    lastheartbeat  int
    clientid       string
    rabbit         *rabbit.Rabbit
    channel        *amqp.Channel
    exchangers     map[string]Exchanger
    topicMap       map[string]chan ExchangeMessage
    brokerChan     chan *MqttClientCmd
    rabbitconnChan chan bool
    needDisConn    bool
}
//监听处理APP的业务请求
func (mc *MqttClient) Serve() {
    defer mc.wg.Done()
    defer mc.commonDefer()
    mc.wg.Add(1)
    go mc.timer()
    for {
        if mc.needDisConn {
            break
        }
        packet, err := MP.ReadPacket(mc.tcpconn)
        if err != nil {
            U.GetLog().Printf("ReadPacket error:%v", err)
            mc.needDisConn = true
            break
        }
        switch packet.GetMqttType() {
        case MP.Connect:
            err = mc.handleConn(packet)
        case MP.Disconnect:
            err = mc.handleDisConn(packet)
        case MP.Pingreq:
            err = mc.handlePing(packet)
        case MP.Publish:
            err = mc.handlePublish(packet)
        case MP.Subscribe:
            err = mc.handleSubscibe(packet)
        case MP.Unsubscribe:
            err = mc.handleUnSubscribe(packet)
        default:
            U.GetLog().Printf("unexpected packet type:%v", packet.GetMqttType())
        }
        if err != nil {
            U.GetLog().Printf("handle packet error:%v", err)
        }
        if err == amqp.ErrClosed {
            mc.channel = nil
            U.GetLog().Printf("handle packet error amqp.ErrClosed:%v", amqp.ErrClosed)
            continue
        }
        mc.lastheartbeat = 0
    }
}
//处理mqtt连接报文
func (mc *MqttClient) handleConn(packet MP.ControlPacket) error {
    U.GetLog().Printf("receive connect request...")
    p := packet.(*MP.ConnectPacket)
    pA := MP.NewControlPacket(MP.Connack).(*MP.ConnackPacket)
    pA.ReturnCode = p.Validate()
    mc.keepalive = int(p.KeepaliveTimer) //mc.keepalive == 10
    if mc.broker != nil && mc.broker.Authenticate != nil {
        authRet := mc.broker.Authenticate(mc, string(p.Username), string(p.Password))
        if !authRet {
            pA.ReturnCode = MP.ErrRefusedBadUsernameOrPassword
        }
    }
    if pA.ReturnCode != MP.Accepted {
        mc.needDisConn = true
    }
    err := mc.trySendPacket(pA)
    return err
}
//给APP发送mqtt连接报文的回包
func (mc *MqttClient) trySendPacket(packet MP.ControlPacket) error {
    // U.GetLog().Printf("try send packet:%v", packet.GetMqttType())
    return packet.Write(mc.tcpconn)
}

说明:作为例子,以上仅以mqtt连接报文叙述了相关连接请求处理,包括用到了前一篇文字<<基于MQTT协议谈谈物联网开发>>中讲到的mqtt协议编解码方法;

3.运行测试

针对上述Mqtt Broker的具体实现,简单用nodejs编写了两个测试程序,一个是subscribe.js模拟订阅者,一个是publisher.js模拟生产者,如下:
生产者publisher.js:

var mqtt = require('mqtt');
var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'xixi', username:'hello', password: 'world', clean:false});
var num = 0;
setInterval(function () {
  for(var i = 0; i < 1; i++) {
    client.publish('someonelikeyou', 'hello mqtt ' + num,{qos:1, retain: true});
    console.log("publish topic someonelikeyou, num:", num++);
  }
}, 1000);

订阅者subscriber.js:

var mqtt = require('mqtt');
var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'haha', username:'hello', password: 'world', clean:false});
client.subscribe('someonelikeyou',{qos:1});
console.log("subscribe topic someonelikeyou");
var num = 0;
client.on('message', function (topic, message) {
      console.log(message.toString());
      num++ 
});

运行结果如下图所示:

生产者运行结果:
MQTT生产者运行结果
订阅者运行结果:
MQTT订阅者运行结果

MqttBroker运行结果:
MqttBroker运行结果

出于篇幅考虑,上述使用到的具体一些函数,如broadcastConnStatus,handlePing等,其具体实现就不一一列举出来了,也有一些成员变量,用到了也不一一具体注释了,主要通过代码关键路径叙述实现的一些细节,如有错误,恳请指出!!!

未完待续...
参考文字:mqtt

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,864评论 2 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • 一:前言 最近在了解MQTT协议相关的内容,内容有点多,特此把MQTT协议,以及其从服务端到客户端的流程整理出来...
    子夏的不语阅读 69,848评论 9 92
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • 那时候家里非常穷,一家四口挤在一个三十平方的屋里,那是爷爷奶奶住过的,后来听说是父亲为了给二叔娶妻才从上边三间屋搬...
    瑢姐姐阅读 195评论 0 0