什么是MQTT
MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景。
与XMPP相比有什么特点
同MQTT类似的是XMPP协议,他们的特点点见下表:
MQTT | XMPP | |
---|---|---|
基于协议层 | TCP | TCP,也可以基于HTTP |
体积 | 小巧 | 庞大 |
适用场景 | 物联网 | 聊天 |
省流 | 省流量 | 费流量 |
省电 | 省电 | 费电 |
成熟度 | 不成熟 | 成熟 |
发布/订阅模式
与请求/回答这种同步模式不同,发布/订阅模式解耦了发布消息的客户(发布者)与订阅消息的客户(订阅者)之间的关系,这意味着发布者和订阅者之间并不需要直接建立联系。打个比方,你打电话给朋友,一直要等到朋友接电话了才能够开始交流,是一个典型的同步请求/回答的场景;而给一个好友邮件列表发电子邮件就不一样,你发好电子邮件该干嘛干嘛,好友们到有空了去查看邮件就是了,是一个典型的异步发布/订阅的场景。
熟悉编程的同学一定非常熟悉这种设计模式了,因为它带来了这些好处:
发布者与订阅者不必了解彼此,只要认识同一个消息代理即可。
发布者和订阅者不需要交互,发布者无需等待订阅者确认而导致锁定。
发布者和订阅者不需要同时在线,可以自由选择时间来消费消息。
主题
MQTT是通过主题对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用就是了。
主题还可以通过通配符进行过滤。其中,+可以过滤一个层级,而#只能出现在主题最后表示过滤任意级别的层级。
举个例子:
building-b/floor-5:代表B楼5层的设备。
+/floor-5:代表任何一个楼的5层的设备。
building-b/#:代表B楼所有的设备。
注意,MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播。
协议介绍
MQTT的通信协议并不复杂,最核心的部分,我认为是他的16种控制类型,如下表:
名字 | 值 | 报文流动方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | 禁止 | 保留 |
我看MQTT协议内容的时候,发现有趣的一点就是他的字符长度是可变的,可以用ASCII,也可以用UTF-8,这个在我接触的其他协议里面是没有的,这样子的好处,显而易见的就是能减少传输流量。
什么是QoS
Qos的全称是服务质量(Quality of Service)。MQTT支持三种QoS,分别是0、1、2。级别越高,交互越复杂,越能保证正确性和到达率,但是开销也更大。他们的交互流程图如下:
- QoS 0: 尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
- QoS 1: 至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
-
QoS 2: 恰好一次。保证这种语义肯定会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。
服务质量是个老话题了。级别2所提供的不重不丢很多情况下是最理想的,不过往返多次的确认一定对并发和延迟带来影响。级别1提供的至少一次语义在日志处理这种场景下是完全OK的,所以像Kafka这类的系统利用这一特点减少确认从而大大提高了并发。级别0适合鸡肋数据场景,食之无味弃之可惜,就这么着吧。
相关阅读
Paho UML
说明:
上面三个类(MqttAndroidClient, MqttService, MqttConnection)是存在于android.paho库中,也就是说,这三个类是paho基于android的封装,而其余的类都是封装在java.paho中,处于底层库。
中间的纵轴线从上往下,MqttService,MqttConnection, MqttAsyncClient, ClientComms,这四个类都具备connect, publish, subscribe等开放给上层的基础方法,因为上层的调用都是通过这四个类传递到底层。
Paho运行的核心在于CommsCallback, CommsSender和CommsReceiver三个线程,分别用来做触发回调、发送消息和接收消息三个动作。
ClientState和CommsTokenStore维护着一套复杂的队列,三个线程通过使用这两个类,来实现线程间的同步和消息的排队。
-
ClientComms的功能除了基础方法接口之外,还维护底层三个线程的状态,这些状态主要包括:
connected
connecting
disconnected
disconnecting
closed
resting
MqttAsyncClient的功能除了基础方法接口之外,主要负责连接的重连,通过下面介绍的流程分析可以看出来,mqtt的重连机制就是在MqttAsynClient发起的。
MqttConnection是一个独立的mqtt连接,用于维护当前连接的所有状态。
流程分析
因为整个库的代码量比较大,我这里就不把代码贴出来了,这是我通过阅读代码整理出来的所有关键方法的流程,包括connect, publish 和reconnect,希望能对读者您有帮助。
- connect()
MqttAndroidClient.bindService()
-
MqttService.connect()
- MqttService.createConnection is not exists
-
MqttConnection.connect()
if option.isCleanSession(), then discard old data (we set cleanSession true)
callback action connect to activity when start connect fail, result fail or success
-
if MqttAsyncClient created
if isConnecting, return
if is connected, doAfterConnectSuccess
else MqttAsyncClient.connect
new MqttAsyncClient and connect
-
MqttAsyncClient.connect()
-
judge if need connect by ClientComms
if connected, throw exception
if connecting, throw exception
if disconnected, throw exception
if closed, throw exception
ClientComms.setReconnectCallback if options set automaticReconnect
new MqttToken as userToken
new ConnectActionListener and set comms, userToken
-
-
ConnectActionListener.connect()
persistence.open
comms.connect
-
ClientComms.connect()
if not disconnected or is closing, throw exception
tokenStore.open(), just set closeResponse null
ConnectBG.run
-
ConnectBG.run()
-
CommsReceiver.start()
runningSemaphore.acquire(), ensure just one receiver thread is running
while in.available && message instance of MqttAck
-
tokenStore.getToken
if token != null, synchronized token and clientState.notifyReceivedAck
else if instance of MqttPubRec MqttPubComp MqttPubAck, log itthese signal only receive after send
else throw exception
-
CommsSender.start()
runningSemaphore.acquire(), ensure just one sender thread is running
await clientState.pendingMessages.removeElementAt(0)
out.write(message)
-
CommsCallback.start()
runningSemaphore.acquire(), ensure just one callback thread is running
workAvailable.wait()
-
check if exists complete token, if true then handleActionComplete
if token.isComplete, notifyComplete
token.internalTok.notifyComplete
-
if token.isNotified
if mqttCallback != null and is DeliveryToken, notify deliveryComplete
if token.getActionCallback != null, notify onSuccess or onFail
token.setNotifiyed(true)
check if exists MqttPublish, if true then handleMessage(MqttPublish)this happens when register topic, not exists in DataCollector
-
internalSend
- clientState.send()
-
-
clientState.send()
token.setMessageId()
-
if MqttPublish
tokenStore.saveToken
pedingMessages.addElement
queueLock.notifyAll()
-
if MqttConnect
tokenStore.saveToken
pedingFlows.insertElementAt(0)
queueLock.notifyAll()
- publish()
MqttAndroidClient.publish()
MqttService.publish()
-
MqttConnection.publish()
-
if MqttAsyncClient.isConnected
MqttAsyncClient.publish
storeSendDetails
else, callbackToActivity error
-
-
MqttAsyncClient.publish()
new MqttPublish
comms.sendNoWait
-
ClientComms.sendNoWait()
-
if connected or trying to connect or trying to disconnect
if disconnectedBuffer is not empty, put message to thatafter reconnect, it will send disconnectBuffer message first, before send real-time message
-
internalSend()
token.setClient
clientState.send
else if disconnectedBuffer is not null, put message to that
else throw exception
-
ClientState.send()go to connect
- Reconnect automatic
ClientComms.shutdownConnection()
CommsCallback.connectionLost()
-
MqttAsyncClient.MqttReconnectCallback.connectinLost()
startReconnectCycle
run ReconnectTask after 1s
attempReconnect()
MqttAsyncClient.connect()go to connect .4