写在前面:
前一篇文字<<基于MQTT协议谈谈物联网开发>>主要叙述了MQTT协议的编解码以及基于MQTT协议的一些常见应用场景,并以一个简单的消息推送系统作为例子阐述具体MQTT应用的开发,这篇文字继续叙述上述应用中Mqtt Broker部分的实现.
1.Mqtt Broker开源产品:
Mqtt Broker简单来说就是基于MQTT协议的Server实现,并且github上有很多开源实现,如下图:
各种Mqtt Broker开源实现的几个关键技术指标对比如下图:
可以根据业务的需求不同选用不同的broker开源实现,总体来说,对Mqtt Broker来说,要求高可用,无状态,易扩容,可靠传输等,下面就以Mqtt Broker的实现作为例子,讲讲基于MQTT协议的Broker具体开发,不考虑太复杂的业务逻辑,仅以最简洁的方式,阐述整个流程,基于Golang和RabbitMQ开发.
2.Mqtt Broker具体实现:
2.1Mqtt Broker架构草图:
2.2Mqtt Broker实现细节说明:
(1)Broker与RabbitMQ建立TCP连接,实现断线重连逻辑以及断线通知机制;
(2)Broker声明与RabbitMQ相关的exchanger, amqp队列,以及rpc队列;
(3)Broker监听APP端mqttclient的tcp连接,对应每个连接,Broker实例化一个appClient;
(4)每个appClient处理与之对应的APP业务请求,包括发布,订阅,取消订阅,心跳等;
(5)每个appClient获取一个channel,通过channel与RabbitMQ进行通信;
(6)若Broker与RabbitMQ发生断连,则channel不可用,必须等待重连成功,然后重新获取channel;
(7)若RabbitMQ发生重启,之前对应APP需要订阅的数据,每个appClient必须重新订阅;
(8)若APP断开连接,则对应的appClient必须释放所有资源并结束生命周期,不再服务;
(9)其他...
2.3Mqtt Broker代码实现:
根据上述的架构草图以及细节说明,就整个架构比较关键的几个环节,用具体代码实现加以阐述,以求透彻.
(1)Broker与RabbitMQ连接:
定义相关数据结构以及方法如下:
package rabbit
var (
RabbitServerAddr = flag.String("rabbitserveraddr", "amqp://guest:guest@localhost:5672", "")
)
const (
CONNECT_INTERVAL = 3
)
var _rabbit *Rabbit = nil
var _once sync.Once
//定义Rabbit结构
type Rabbit struct {
wg *sync.WaitGroup
rabbitconn *amqp.Connection
rabbitconnListeners map[string]chan bool
_mutex sync.Mutex
}
//获取Rabbit单例
func GetRabbitInstance(_wg *sync.WaitGroup, listenerID string, rabbitconnChan chan bool) *Rabbit {
_once.Do(func() {
_rabbit = &Rabbit{
wg: _wg,
}
_rabbit.rabbitconnListeners = make(map[string]chan bool)
_rabbit.initConn()
})
if listenerID != "" && rabbitconnChan != nil {
_rabbit._mutex.Lock()
_rabbit.rabbitconnListeners[listenerID] = rabbitconnChan
_rabbit._mutex.Unlock()
}
return _rabbit
}
//建立RabbitMQ连接
func (r *Rabbit) initConn() {
if r.rabbitconn != nil {
return
}
conn, err := amqp.Dial(*RabbitServerAddr)
if err != nil {
time.Sleep(time.Second * CONNECT_INTERVAL)
U.FailOnError(err)
}
r.rabbitconn = conn
r.wg.Add(1)
go r.checkConn()
}
//RabbitMQ断线重连
func (r *Rabbit) checkConn() {
defer r.wg.Done()
for {
<-r.rabbitconn.NotifyClose(make(chan *amqp.Error))
U.GetLog().Printf("r.conn.NotifyClose")
r.broadcastConnStatus(false)
for {
conn, err := amqp.Dial(*RabbitServerAddr)
if err == nil {
r.rabbitconn = conn
r.broadcastConnStatus(true)
break
}
U.GetLog().Printf("amqp.Dial failed")
r.broadcastConnStatus(false)
time.Sleep(time.Second * CONNECT_INTERVAL)
}
}
}
(2)Broker声明exchanger, amqp队列,以及rpc队列:
定义相关数据结构以及方法如下:
package mqtt
type TOPIC_CHAN_TYPE map[string]chan ExchangeMessage
//定义BrokerExchanger结构
type BrokerExchanger struct {
wg *sync.WaitGroup
channel *amqp.Channel
exchangerChan chan *MqttClientCmd
amqpqueueName string
amqpDeliveryChan <-chan amqp.Delivery
amqpTopicChan map[string]TOPIC_CHAN_TYPE
rpcqueueName string
rpcDeliveryChan <-chan amqp.Delivery
rpcClientChan map[string]chan ExchangeMessage
}
//实例化BrokerExchanger
func NewBrokerExchanger(_wg *sync.WaitGroup) *BrokerExchanger {
_be := &BrokerExchanger{
wg: _wg,
}
_be.once()
return _be
}
//声明Exchange,区分amqp与rpc类型
func (be *BrokerExchanger) declareExchange(exctype string) error {
if be.channel == nil {
return fmt.Errorf("[BrokerExchanger::declareExchange] channel not ready")
}
var defaultExchange string
var exchangeType string
switch exctype {
case defaultAMQPExchange:
defaultExchange = defaultAMQPExchange
exchangeType = "topic"
case defaultRPCExchange:
defaultExchange = defaultRPCExchange
exchangeType = "direct"
default:
U.GetLog().Printf("unexpected exchange type:%s", exctype)
return fmt.Errorf("[BrokerExchanger::declareExchange] unexpected exchange type:%s", exctype)
}
err := be.channel.ExchangeDeclare(
defaultExchange, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
return err
}
//声明Queue,区分amqp与rpc类型
func (be *BrokerExchanger) declareQueue(exctype string) error {
if be.channel == nil {
return fmt.Errorf("[BrokerExchanger::declareQueue] channel not ready")
}
q, err := be.channel.QueueDeclare(
"", // name
true, // durable
true, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
switch exctype {
case defaultAMQPExchange:
be.amqpqueueName = q.Name
case defaultRPCExchange:
be.rpcqueueName = q.Name
default:
U.GetLog().Printf("unexpected exchange type:%s", exctype)
return fmt.Errorf("[BrokerExchanger::declareQueue] unexpected exchange type:%s", exctype)
}
return nil
}
//绑定Queue,区分amqp与rpc类型
func (be *BrokerExchanger) queueBind(exctype string) error {
var queueName string
switch exctype {
case defaultAMQPExchange:
return nil
case defaultRPCExchange:
queueName = be.rpcqueueName
default:
U.GetLog().Printf("unexpected exchange type:%s", exctype)
return fmt.Errorf("[BrokerExchanger::queueBind] unexpected exchange type:%s", exctype)
}
err := be.channel.QueueBind(
queueName, // queue name
queueName, // routing key
defaultRPCExchange, // exchange
false,
nil,
)
return err
}
//消费Queue,区分amqp与rpc类型
func (be *BrokerExchanger) consume(exctype string) error {
var queueName string
switch exctype {
case defaultAMQPExchange:
queueName = be.amqpqueueName
case defaultRPCExchange:
queueName = be.rpcqueueName
default:
U.GetLog().Printf("unexpected exchange type:%s", exctype)
return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype)
}
deliveryChan, err := be.channel.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
switch exctype {
case defaultAMQPExchange:
be.amqpDeliveryChan = deliveryChan
case defaultRPCExchange:
be.rpcDeliveryChan = deliveryChan
default:
U.GetLog().Printf("unexpected exchange type:%s", exctype)
return fmt.Errorf("[BrokerExchanger::consume] unexpected exchange type:%s", exctype)
}
return nil
}
说明:amqp发布订阅类型与rpc类型,在声明exchange时,exchange type字段不同,一个是topic,一个direct;
(3)MqttBroker定义及实现:
定义相关数据结构以及方法如下:
package mqtt
//定义MqttBroker结构
type MqttBroker struct {
wg *sync.WaitGroup
addr string
serverid string
rabbit *rabbit.Rabbit
channel *amqp.Channel
brokerexchanger *BrokerExchanger
mqttconnChan chan net.Conn
rabbitconnChan chan bool
connListeners map[string]chan bool
mqttClientChan chan *MqttClientCmd
clientMap map[string]*MqttClient
Authenticate AuthenticateFunc
AuthorizePublish AuthorizePublishFunc
AuthorizeSubscribe AuthorizeSubscribeFunc
}
//监听RabbitMQ连接状态
func (mb *MqttBroker) handleRabbitConnChan() {
defer mb.wg.Done()
for connStatus := range mb.rabbitconnChan {
U.GetLog().Printf("serverid:%s, rabbitmq connection status:%v", mb.serverid, connStatus)
if !connStatus {
mb.reset()
}
if connStatus {
mb.rabbitConnSuccessCallback()
}
}
}
//初始化Broker Exchanger
func (mb *MqttBroker) rabbitConnSuccessCallback() error {
err := mb.initChannel()
if err != nil {
U.GetLog().Printf("initChannel error:%v", err)
return err
}
err = mb.initBrokerExchanger()
if err != nil {
U.GetLog().Printf("mb.brokerexchanger.Init error:%v", err)
return err
}
mb.broadcastRabbitConnStatus(true)
return err
}
//监听APP连接
func (mb *MqttBroker) ListenAndServe() {
defer mb.wg.Done()
var listener net.Listener
var err error
listener, err = net.Listen("tcp", mb.addr)
U.FailOnError(err)
U.GetLog().Printf("listen and serve mqtt broker on %s", mb.addr)
for {
conn, err := listener.Accept()
if err != nil {
U.GetLog().Printf("accepting new connection error:%v", err)
continue
}
mb.wg.Add(1)
mb.mqttconnChan <- conn
}
}
//针对每个APP连接,实例化MqttClient处理相关业务数据
func (mb *MqttBroker) handleMqttConnection() {
defer mb.wg.Done()
for conn := range mb.mqttconnChan {
mqttclient, err := NewMqttClient(mb.wg, conn, mb)
if err != nil {
U.GetLog().Printf("NewMqttClient error:%v", err)
continue
}
mb.clientMap[mqttclient.GetClientID()] = mqttclient
mb.wg.Add(1)
go mqttclient.Serve()
}
}
(4)MqttClient定义及实现:
定义相关数据结构以及方法如下:
package mqtt
//定义MqttClient结构
type MqttClient struct {
wg *sync.WaitGroup
tcpconn net.Conn
broker *MqttBroker
keepalive int
lastheartbeat int
clientid string
rabbit *rabbit.Rabbit
channel *amqp.Channel
exchangers map[string]Exchanger
topicMap map[string]chan ExchangeMessage
brokerChan chan *MqttClientCmd
rabbitconnChan chan bool
needDisConn bool
}
//监听处理APP的业务请求
func (mc *MqttClient) Serve() {
defer mc.wg.Done()
defer mc.commonDefer()
mc.wg.Add(1)
go mc.timer()
for {
if mc.needDisConn {
break
}
packet, err := MP.ReadPacket(mc.tcpconn)
if err != nil {
U.GetLog().Printf("ReadPacket error:%v", err)
mc.needDisConn = true
break
}
switch packet.GetMqttType() {
case MP.Connect:
err = mc.handleConn(packet)
case MP.Disconnect:
err = mc.handleDisConn(packet)
case MP.Pingreq:
err = mc.handlePing(packet)
case MP.Publish:
err = mc.handlePublish(packet)
case MP.Subscribe:
err = mc.handleSubscibe(packet)
case MP.Unsubscribe:
err = mc.handleUnSubscribe(packet)
default:
U.GetLog().Printf("unexpected packet type:%v", packet.GetMqttType())
}
if err != nil {
U.GetLog().Printf("handle packet error:%v", err)
}
if err == amqp.ErrClosed {
mc.channel = nil
U.GetLog().Printf("handle packet error amqp.ErrClosed:%v", amqp.ErrClosed)
continue
}
mc.lastheartbeat = 0
}
}
//处理mqtt连接报文
func (mc *MqttClient) handleConn(packet MP.ControlPacket) error {
U.GetLog().Printf("receive connect request...")
p := packet.(*MP.ConnectPacket)
pA := MP.NewControlPacket(MP.Connack).(*MP.ConnackPacket)
pA.ReturnCode = p.Validate()
mc.keepalive = int(p.KeepaliveTimer) //mc.keepalive == 10
if mc.broker != nil && mc.broker.Authenticate != nil {
authRet := mc.broker.Authenticate(mc, string(p.Username), string(p.Password))
if !authRet {
pA.ReturnCode = MP.ErrRefusedBadUsernameOrPassword
}
}
if pA.ReturnCode != MP.Accepted {
mc.needDisConn = true
}
err := mc.trySendPacket(pA)
return err
}
//给APP发送mqtt连接报文的回包
func (mc *MqttClient) trySendPacket(packet MP.ControlPacket) error {
// U.GetLog().Printf("try send packet:%v", packet.GetMqttType())
return packet.Write(mc.tcpconn)
}
说明:作为例子,以上仅以mqtt连接报文叙述了相关连接请求处理,包括用到了前一篇文字<<基于MQTT协议谈谈物联网开发>>中讲到的mqtt协议编解码方法;
3.运行测试
针对上述Mqtt Broker的具体实现,简单用nodejs编写了两个测试程序,一个是subscribe.js模拟订阅者,一个是publisher.js模拟生产者,如下:
生产者publisher.js:
var mqtt = require('mqtt');
var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'xixi', username:'hello', password: 'world', clean:false});
var num = 0;
setInterval(function () {
for(var i = 0; i < 1; i++) {
client.publish('someonelikeyou', 'hello mqtt ' + num,{qos:1, retain: true});
console.log("publish topic someonelikeyou, num:", num++);
}
}, 1000);
订阅者subscriber.js:
var mqtt = require('mqtt');
var client = mqtt.createClient(6666, '127.0.0.1', {clientId:'haha', username:'hello', password: 'world', clean:false});
client.subscribe('someonelikeyou',{qos:1});
console.log("subscribe topic someonelikeyou");
var num = 0;
client.on('message', function (topic, message) {
console.log(message.toString());
num++
});
运行结果如下图所示:
出于篇幅考虑,上述使用到的具体一些函数,如broadcastConnStatus,handlePing等,其具体实现就不一一列举出来了,也有一些成员变量,用到了也不一一具体注释了,主要通过代码关键路径叙述实现的一些细节,如有错误,恳请指出!!!
未完待续...
参考文字:mqtt