一.MQTT
1.简介
MQTT(Message Queuing Telemetry Transport 消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是IBM开发的一个基于客户端-服务器的消息发布/订阅传输协议。
MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
2.特性
- 基于发布 / 订阅范式的 “轻量级” 消息协议(头部 2 字节)
- 专为资源受限的设备、低带宽占用高延时或者不可靠的网络设计,适用于 IoT 与 M2M
- 基于 TCP/IP 协议栈
- 实时的 IoT 通讯的标准协议
二.Mosquitto
1.简介
Mosquitto是一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单。
2.Broker
我们知道,网络间进行通信需要有Server和Client,在Mqtt中Broker扮演了Server的角色,基于mosquitto源码通过NDK进行编译生成android系统端可执行的bin文件,通过mosquitto -c mosquitto.conf来启动Broker;
3.版本和名称
Mosquitto会支持不同的协议版本号和名称,通过PROTOCOL_NAME和PROTOCOL_VERSION来进行区分,比如本文用到的mosquitto源码版本支持MQTTV3.1和MQTTV3.1.1,MQTTV3.1对应的协议name为"MQIsdp";
MQTTV3.1.1对应的协议name为"MQTT";版本和name必须匹配。
三.Client端实现
Client端实现主要分为三部分:Client端的创建;Client端连接;Client端消息注册;
1.Client端创建
在创建client时,需要初始化一些指定的参数,通过这些参数来处理与broker端的交互,包括连接,心跳,断开重连及设置状态回调等。
//用来存储Qos=1和2的消息
MemoryPersistence dataStore = new MemoryPersistence();
//保存着一些控制客户端如何连接到服务器的选项
MqttConnectOptions mConOpt = new MqttConnectOptions();
//set mqtt version
mConOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
/**
* set cleanSession
* false:broker will save connection record for client
* true:As a new client to connect broker every time[每次连接上都是一个新的客户端]
*/
mConOpt.setCleanSession(true);
// set heartbeat 30s[30S去检测一下broker是否有效,如果无效,会回调connectionLost]
mConOpt.setKeepAliveInterval(30);
// set username
if (userName != null) {
mConOpt.setUserName(userName);
}
// set password
if (password != null) {
mConOpt.setPassword(password.toCharArray());
}
//when disconnect unexpectly, broker will send "close" to clients which subscribe this topic to announce the connection is lost
mConOpt.setWill(topic, "close".getBytes(), 2, true);
//client reconnect to broker automatically[与broker断开后会去重连]
mConOpt.setAutomaticReconnect(true);
// create Mqtt client
if (sClient == null) {
sClient = new MqttClient(brokerUrl, clientId, dataStore);
// set callback[状态回调]
mCallback = new MqttCallbackBus(sInstance);
sClient.setCallback(mCallback);
}
2.Client端连接Broker
上一步创建了Client,并且初始化了各种参数,接下来调用connect进行连接,本文创建的是异步Client,设置了连接状态的回调IMqttActionListener,连接成功后可以进行topic的订阅,失败后可以进行重连。
// connect to broker
sClient.connect(mConOpt);
//异步的client,同步连接没有状态回调
mClient = new MqttAsyncClient(brokerUrl, clientId, dataStore);
mClient.connect(mConOpt, null, mIMqttActionListener);
//连接状态回调
private IMqttActionListener mIMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
try {
Log.i(TAG, "connect success");
......
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
try {
Log.e(TAG, "connect failure, reconnect");
......
} catch (Exception e) {
e.printStackTrace();
}
}
};
3.状态回调
在第一步进行client创建时传入了MqttCallback,在与broker断开连接、新消息到达、消息发送完成后,通过该MqttCallback会收到对应的回调,具体如下:
public class MqttCallbackBus implements MqttCallback {
private static final String TAG = MqttCallbackBus.class.getSimpleName();
private MqttManager mMqttManager;
public MqttCallbackBus(MqttManager mqttManager) {
mMqttManager = mqttManager;
}
@Override
public void connectionLost(Throwable cause) {
Log.e(TAG, "cause : " + cause.toString());
//与broker断开后回调,[虽然上边属性中设置了自动重连,但是连上后不会去订阅topic,即使连上也接收不到topic,因此选择在此手动连接,然后在连接成功后订阅topic]
mMqttManager.reconnectBroker();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.e(TAG, "topic : " + topic + "\t MqttMessage : " + message.toString());
//订阅的消息接收到后回调
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.e(TAG, "token : " + token.toString());
//消息publish完成后回调
}
}
四.Eclipse paho源码分析
Client端是基于Eclipse Paho提供的mqtt开源库进行实现,接下来对Eclipse Paho源码进行分析:
1.MqttConnectOptions.java
//设置是否重连
public void setAutomaticReconnect(boolean automaticReconnect) {
this.automaticReconnect = automaticReconnect;
}
//获取是否设置了重连标志,确定后续是否进行重连
public boolean isAutomaticReconnect() {
return automaticReconnect;
}
2.MqttAsyncClient.java
创建mqtt async client,包括一些实例初始化等,程序最重要的入口类。
2.1.构造方法
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
....
....
MqttConnectOptions.validateURI(serverURI);
this.serverURI = serverURI;
this.clientId = clientId;
this.persistence = persistence;
if (this.persistence == null) {
}
this.persistence.open(clientId, serverURI);
//创建了ClientComms,最终去跟broker建立连接
this.comms = new ClientComms(this, this.persistence, pingSender);
this.persistence.close();
this.topics = new Hashtable();
}
在构造方法内,进行了一些变量赋值,然后创建ClientComms实例,该实例用来跟broker建立连接,后面会进行分析;
2.2.connect()
在创建完实例后,调用connect()去跟broker建立连接,看一下connect()方法的具体实现:
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
....
this.connOpts = options;
this.userContext = userContext;
final boolean automaticReconnect = options.isAutomaticReconnect();
....
comms.setNetworkModules(createNetworkModules(serverURI, options));
comms.setReconnectCallback(new MqttCallbackExtended() {
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectComplete(boolean reconnect, String serverURI) {
}
public void connectionLost(Throwable cause) {
if(automaticReconnect){
// Automatic reconnect is set so make sure comms is in resting state
comms.setRestingState(true);
reconnecting = true;
//设置了重连,在收到connectionLost后,进行重连
startReconnectCycle();
}
}
});
// Insert our own callback to iterate through the URIs till the connect succeeds
MqttToken userToken = new MqttToken(getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
......
comms.setNetworkModuleIndex(0);
connectActionListener.connect();
}
在connect()内部主要做了以下几件事:
1.通过createNetworkModules()创建NetworkModule,包含serverURl,最终创建的URI_TYPE_TCP,对应的是TCPNetworkModule;
2.调用setReconnectCallback()来设置重连,状态断开时会进行自动重连;
3.创建ConnectActionListener对象,传入了comms、callback等参数,连接状态onSuccess()和onFailure()是在ConnectActionListener里面进行回调的;
4.执行ConnectActionListener的connect()进行连接;
2.3.startReconnectCycle()
前面讲到,如果设置了automaticReconnect,则在异常断开后会调用startReconnectCycle()进行重连:
//1.重连循环
private void startReconnectCycle() {
....
reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
}
//2.重连task
private class ReconnectTask extends TimerTask {
private static final String methodName = "ReconnectTask.run";
public void run() {
attemptReconnect();
}
}
//3.重连入口
private void attemptReconnect(){
....
try {
//连接
connect(this.connOpts, this.userContext,new IMqttActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
....
comms.setRestingState(false);
//重连成功,结束重连循环
stopReconnectCycle();
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
....
//继续重连,下一次重连时间是上一次的两倍,最高是128s
if(reconnectDelay < 128000){
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}
});
....
}
//设置Mqttcallback
public void setCallback(MqttCallback callback) {
this.mqttCallback = callback;
//将MqttCallbackBus回调设置给ClientComms,后续的回调供client使用,此处主要用到onConnectionLost()
comms.setCallback(callback);
}
3.ConnectActionListener.java
通过以上可以看到,connect()方法中,最终调用的是connectActionListener的connect()方法,一起看一下该方法的具体实现:
3.1.connect()
public void connect() throws MqttPersistenceException {
//创建MqttToken
MqttToken token = new MqttToken(client.getClientId());
//设置callback,由于connectActionListener实现了IMqttActionListener,即把自己注册进去
token.setActionCallback(this);
token.setUserContext(this);
......
try {
//调用comms的connect,comms是在创建client里面创建的,在connect时传入connectActionListener里面
comms.connect(options, token);
} catch (MqttException e) {
onFailure(token, e);
}
}
3.2.onSuccess()和onFailure()
public void onSuccess(IMqttToken token) {
....
if (userCallback != null) {
//回调传入的IActionListener回调,该userCallback是在AsyncClient.connect()是传入的IMqttActionListener
userCallback.onSuccess(userToken);
}
....
}
public void onFailure(IMqttToken token, Throwable exception) {
....
if (userCallback != null) {
//回调传入的IActionListener回调,该userCallback是在AsyncClient.connect()是传入的IMqttActionListener
userCallback.onFailure(userToken, exception);
}
....
}
}
4.ClientComms.java
该类也是一个非常重要的类,主要创建了三个线程和ClientState实例,构造方法中创建了CommsCallback线程和ClientState实例,接着上步会调用到connect()方法,看一下该方法的实现逻辑:
4.1.connect()
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
//设置状态为连接中
conState = CONNECTING;
conOptions = options;
//创建MqttConnect,表示与broker连接的message
MqttConnect connect = new MqttConnect(client.getClientId(),
conOptions.getMqttVersion(),
conOptions.isCleanSession(),
conOptions.getKeepAliveInterval(),
conOptions.getUserName(),
conOptions.getPassword(),
conOptions.getWillMessage(),
conOptions.getWillDestination());
......
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect);
conbg.start();
}
}
}
在connect()内部创建了MqttConnect,表示是连接message,然后创建ConnectBG实例并执行start();
4.2.ConnectBG
private class ConnectBG implements Runnable {
ClientComms clientComms = null;
Thread cBg = null;
MqttToken conToken;
MqttConnect conPacket;
ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) {
clientComms = cc;
conToken = cToken;
conPacket = cPacket;
cBg = new Thread(this, "MQTT Con: "+getClient().getClientId());
}
void start() {
cBg.start();
}
public void run() {
......
try {
........
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId());
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId());
//CommsCallback本身是一个线程,启动
callback.start("MQTT Call: "+getClient().getClientId());
internalSend(conPacket, conToken);
}
.......
if (mqttEx != null) {
//不为空,说明进入了catch,则shut down
shutdownConnection(conToken, mqttEx);
}
}
}
//接着AsyncClient.setCallback,会调用ClientComms设置MqttCallbackBus回调
public void setCallback(MqttCallback mqttCallback) {
this.callback.setCallback(mqttCallback);
}
从上面的代码可以看到,在执行connect()方法后,主要做了以下几项工作:
1.networkModule.start():创建socket,与broker建立连接;
//TcpNetworkModule.java
public void start() throws IOException, MqttException
try {
......
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
......
}
.......
}
2.创建CommsReceiver()然后start(),不断循环读取Broker端来的消息;
//CommsReceiver.java
public void run() {
......
while (running && (in != null)) {
try {
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
clientState.notifyReceivedAck((MqttAck)message);
}
}
} else {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
}
........
........
}
3.创建CommsSender实例,然后start(),主要用来向Broker发送消息;
//CommsSender.java
public void run() {
......
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} ......
clientState.notifySent(message);
}
}
}
}
}
}
......
}
4.CommsCallback.start():通过该类来实现broker返回消息的回调处理入口,后面会讲到。
5.internalSend(conPacket, conToken):发送连接action
5.ClientState.java
client在进行消息publish时,先经过ClientComms,最终会调用到ClientState里面的send()方法,看一下send()方法的实现逻辑:
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
......
//message是publish型的message
if (message instanceof MqttPublish) {
synchronized (queueLock) {
......
//获取到要发送的Message
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
//获取到要发送Message的Qos
switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
//加入pendingMessages,CommsSender通过get()方法获取到message后就会进行发送
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
//其他类型的message
if (message instanceof MqttConnect) {
synchronized (queueLock) {
// Add the connect action at the head of the pending queue ensuring it jumps
// ahead of any of other pending actions.
tokenStore.saveToken(token, message);
pendingFlows.insertElementAt(message,0);
queueLock.notifyAll();
}
} else {
if (message instanceof MqttPingReq) {
this.pingCommand = message;
}
else if (message instanceof MqttPubRel) {
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
}
else if (message instanceof MqttPubComp) {
persistence.remove(getReceivedPersistenceKey(message));
}
synchronized (queueLock) {
if ( !(message instanceof MqttAck )) {
tokenStore.saveToken(token, message);
}
pendingFlows.addElement(message);
queueLock.notifyAll();
}
}
}
}
//连接成功
public void connected() {
final String methodName = "connected";
//@TRACE 631=connected
log.fine(CLASS_NAME, methodName, "631");
this.connected = true;
//启动pingSender,来在keepAliveInterval内发送心跳包
pingSender.start(); //Start ping thread when client connected to server.
}
在CommsReceiver收到broker的ack及普通消息后,会先经过clientState,具体会调用以下两个方法:
notifyReceivedAck():连接成功ack、qos=1和2时publish消息后的ack、心跳相关ack等都会通过该方法调用CommsCallback的方法。
notifyReceivedMsg():正常的publish消息等。
pingSender.start():表示client在连上broker后会在aliveInterval后发送心跳包,pingSender是在创建client时就创建了TimerPingSender实例,一步步先传给clientComms,再传给ClientState,执行start()来创建Timer,执行TimerTask来最终clientState的checkForActivity(),将PingReq加入pendingFlows后,queueLock.notifyAll(),调用CommsSender来进行发送,然后加入下一轮执行。如果正常的话,会收到PingResp,修改lastInboundActivity和pingOutstanding的值来在下一轮执行check时来判断是否收到心跳,异常就抛出REASON_CODE_CLIENT_TIMEOUT(32000)码。
6.CommsCallback.java
通过该类来实现broker返回消息的回调处理入口,包括处理断开、消息发送成功,消息到达、连接状态回调等
public void start(String threadName) {
synchronized (lifecycle) {
if (!running) {
// Preparatory work before starting the background thread.
// For safety ensure any old events are cleared.
messageQueue.clear();
completeQueue.clear();
running = true;
quiescing = false;
callbackThread = new Thread(this, threadName);
callbackThread.start();
}
}
}
public void run() {
final String methodName = "run";
while (running) {
try {
//没有work时,wait(),当有message来时,通过workAvailable.notifyAll()来唤醒
try {
synchronized (workAvailable) {
if (running && messageQueue.isEmpty()
&& completeQueue.isEmpty()) {
workAvailable.wait();
}
}
} catch (InterruptedException e) {
}
if (running) {
// Check for deliveryComplete callbacks...
MqttToken token = null;
synchronized (completeQueue) {
if (!completeQueue.isEmpty()) {
// First call the delivery arrived callback if needed
token = (MqttToken) completeQueue.elementAt(0);
completeQueue.removeElementAt(0);
}
}
if (null != token) {
handleActionComplete(token);
}
// Check for messageArrived callbacks...
MqttPublish message = null;
synchronized (messageQueue) {
if (!messageQueue.isEmpty()) {
// Note, there is a window on connect where a publish
// could arrive before we've
// finished the connect logic.
message = (MqttPublish) messageQueue.elementAt(0);
messageQueue.removeElementAt(0);
}
}
.......
} finally {
synchronized (spaceAvailable) {
spaceAvailable.notifyAll();
}
}
}
}
//连接回调方法入口,通过handleActionComplete()来调用
public void fireActionEvent(MqttToken token) {
final String methodName = "fireActionEvent";
if (token != null) {
//此处的asyncCB就是在MqttAsyncClient.connect()里面传入的ConnectActionListener,参考如下:
//ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
//userToken.setActionCallback(connectActionListener);
IMqttActionListener asyncCB = token.getActionCallback();
if (asyncCB != null) {
if (token.getException() == null) {
//回调ConnectActionListener的onSuccess的方法
asyncCB.onSuccess(token);
} else {
//回调ConnectActionListener的onFailure的方法
asyncCB.onFailure(token, token.getException());
}
}
}
}
//设置MqttCallbackBus回调
public void setCallback(MqttCallback mqttCallback) {
this.mqttCallback = mqttCallback;
}
//断开回调MqttCallbackBus接口
public void connectionLost(MqttException cause) {
try {
if (mqttCallback != null && cause != null) {
//回调MqttCallbackBus的connectionLost接口
mqttCallback.connectionLost(cause);
....
}
....
}
....
}
7.客户端连接流程图
总结一下client端连接broker的流程图,黑色的线代表client端从创建到跟broker进行connect()的调用流程,绿色的线代表从broker收到回复消息后的调用流程。
8.客户端发送及订阅接收消息流程图
五.qos值及其含义
1.至多一次
消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
2.至少一次
确保消息到达,但消息可能会重复发生。
3.只有一次
确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
4.源码分析
从Broker来的ack消息是通过CommsReceiver来接收的,接收后会调用ClientState的notifyReceiveAck()方法,结合上面的图及代码一起看一下:
protected void notifyReceivedAck(MqttAck ack) throws MqttException {
final String methodName = "notifyReceivedAck";
this.lastInboundActivity = System.currentTimeMillis();
MqttToken token = tokenStore.getToken(ack);
MqttException mex = null;
if (token == null) {
} else if (ack instanceof MqttPubRec) {
MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
//收到MqttPubRec后,创建MqttPubRel进行回复
this.send(rel, token);
} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
//qos = 1或2时,收到MqttPubAck或MqttPubComp来通知deliveryComplete回调,及删除message
notifyResult(ack, token, mex);
} else if (ack instanceof MqttPingResp) {
synchronized (pingOutstandingLock) {
pingOutstanding = Math.max(0, pingOutstanding-1);
notifyResult(ack, token, mex);
if (pingOutstanding == 0) {
tokenStore.removeToken(ack);
}
}
//@TRACE 636=ping response received. pingOutstanding: {0}
log.fine(CLASS_NAME,methodName,"636",new Object[]{ new Integer(pingOutstanding)});
} else if (ack instanceof MqttConnack) {
......
//连接成功的回调
......
} else {
......
}
checkQuiesceLock();
}
Subscriber在收到message后,会执行到ClientState.notifyReceivedMsg()方法,该方法会根据qos的值来做相应的处理,qos=0或1,直接会调用messageArrived,然后删除persistence,send(PubAck);
qos=2时,先存储,然后send(PubRec),接下来会收到broker的PubRel,如果能找到msg,则会调用messageArrived,然后send(PubComp),删除persistence;如果再次收到PubRel,找不到msg,则直接send(PubComp),确保只执行一次messageArrived。
//ClientState.java
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = "notifyReceivedMsg";
this.lastInboundActivity = System.currentTimeMillis();
// @TRACE 651=received key={0} message={1}
log.fine(CLASS_NAME, methodName, "651", new Object[] {
new Integer(message.getMessageId()), message });
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish) message;
switch (send.getMessage().getQos()) {
case 0:
case 1:
//Qos=1或0,直接执行
if (callback != null) {
callback.messageArrived(send);
}
break;
case 2:
//Qos=2,先存储,然后发送PubRec
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null);
break;
default:
//should NOT reach here
}
} else if (message instanceof MqttPubRel) {
//收到PubRel后,先从inboundQoS2找msg
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
//找到msg,表示还未执行messageArrived,先执行
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
}
} else {
//找不到说明已经执行了messageArrived,直接发送PubComp
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null);
}
}
}
}
clientState的notifyResult()方法,经过一系列调用,最终会通过CommsCallback中的workAvailable.notifyAll()---->run()---->handleActionComplete()方法,看一下该方法的实现:
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
if (token.isComplete()) {
//删除message
clientState.notifyComplete(token);
}
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if ( mqttCallback != null
&& token instanceof MqttDeliveryToken
&& token.isComplete()) {
//回调deliveryComplete()方法
mqttCallback.deliveryComplete((MqttDeliveryToken) token);
}
//内部逻辑只有在connect()时才会调用,publish()时,Listener为null,不会执行
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if ( token.isComplete() ){
if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
token.internalTok.setNotified(true);
}
}
}
}
以下是在收到MqttPubAck或MqttPubComp后,会将存储的persistence删除掉。
//ClientState.java
protected void notifyComplete(MqttToken token) throws MqttException {
final String methodName = "notifyComplete";
MqttWireMessage message = token.internalTok.getWireMessage();
if (message != null && message instanceof MqttAck) {
// @TRACE 629=received key={0} token={1} message={2}
log.fine(CLASS_NAME, methodName, "629", new Object[] {
new Integer(message.getMessageId()), token, message });
MqttAck ack = (MqttAck) message;
if (ack instanceof MqttPubAck) {
// QoS 1 - user notified now remove from persistence...
persistence.remove(getSendPersistenceKey(message));
outboundQoS1.remove(new Integer(ack.getMessageId()));
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 650=removed Qos 1 publish. key={0}
log.fine(CLASS_NAME, methodName, "650",
new Object[] { new Integer(ack.getMessageId()) });
} else if (ack instanceof MqttPubComp) {
// QoS 2 - user notified now remove from persistence...
persistence.remove(getSendPersistenceKey(message));
persistence.remove(getSendConfirmPersistenceKey(message));
outboundQoS2.remove(new Integer(ack.getMessageId()));
inFlightPubRels--;
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 645=removed QoS 2 publish/pubrel. key={0}, -1 inFlightPubRels={1}
log.fine(CLASS_NAME, methodName, "645", new Object[] {
new Integer(ack.getMessageId()),
new Integer(inFlightPubRels) });
}
checkQuiesceLock();
}
}
以上逻辑主要是发送或接收Qos=1和Qos=2的message后,publisher与Broker、Broker与Subscriber之间的交互流程。
看一下总的流程图:
5.总结
Qos0:消息不存persistence,publish后直接通过notifySent()后来complete;
Qos1:publisher:消息存persistence,publish后收到PubAck后来进行complete及persistence.remove();
subscriber:notifyReceivedMsg后,先deliverMessage(),然后send(PubAck);
Qos2:publisher:消息存persistence,publish后会收到PubRec,然后发送PubRel,再收到PubComp后进行complete及persistence.remove();
subscriber:notifyReceivedMsg后,先persistence.put(),inboundQos2.put(),然后send(PubRec)到Broker,收到来自Broker的PubRel后,再次notifyReceivedMsg,执行deliverMessage(),send(PubComp),删除persistence。如果再次收到PubRel,不会进行deliverMessage(),直接send(PubComp)。
六.心跳机制
1.Keep Alive指定连接最大空闲时间T,当客户端检测到连接空闲时间超过T时,必须向Broker发送心跳报文PINGREQ,Broker收到心跳请求后返回心跳响应PINGRESP。
2.若Broker超过1.5T时间没收到心跳请求则断开连接,并且投递遗嘱消息到订阅方;同样,若客户端超过一定时间仍没收到Broker心跳响应PINGRESP则断开连接。
3.连接空闲时发送心跳报文可以降低网络请求,弱化对带宽的依赖。
七.保留消息定义[retained]
如果Publish消息的retained标记位被设置为1,则称该消息为“保留消息”;
Broker对保留消息的处理如下:
Broker会存储每个Topic的最后一条保留消息及其Qos,当订阅该Topic的客户端上线后,Broker需要将该消息投递给它。
保留消息作用:
可以让新订阅的客户端得到发布方的最新的状态值,而不必要等待发送。
保留消息的删除:
方式1:发送空消息体的保留消息;
方式2:发送最新的保留消息覆盖之前的(推荐);
八.完全实现解耦
MQTT这种结构替代了传统的客户端/服务器模型,可以实现以下解耦:
空间解耦:发布者和订阅者不需要知道对方;
时间解耦:发布者和订阅者不需要同时运行(离线消息, retained = 1的话,可以实现);
同步解耦:发布和接收都是异步通讯,无需停止任何处理;
九.与HTTP比较:
MQTT最长可以一次性发送256MB数据;
HTTP是典型的C/S通讯模式:请求从客户端发出,服务端只能被动接收,一条连接只能发送一次请求,获取响应后就断开连接;
HTTP的请求/应答方式的会话都是客户端发起的,缺乏服务器通知客户端的机制,客户端应用需要不断地轮询服务器;
十.mosquitto.conf
可以通过以下输出mosquitto日志:
log_dest file /storage/emulated/0/mosquitto.log //Android系统输出到文件
log_dest stdout // linux系统直接输出到工作台
log_type all
十一.mqtt本地调试
1.启动broker,mosquitto – 代理器主程序
./mosquitto &
其中:broker ip 为电脑端ip;port:默认1883;
2.mosquitto_pub – 用于发布消息的命令行客户端,向已订阅的topic发布消息
./mosquitto_pub -h host -p port -t topic -m message
其中:-h:broker ip;-p:端口号,默认1883;-t:已订阅的topic;-m:发布的消息
举例
./mosquitto_pub -h 10.10.20.10 -p 1883 -t baiduMap -m "{"origin": "西直门","end":"东直门"}"
3.mosquitto_sub – 用于订阅消息的命令行客户端,订阅topic
./mosquitto_sub -h host -p port -t topic
其中:-h:broker ip;-p:端口号,默认1883;-t:需要订阅的topic
4.运行环境
ubuntu系统,将libmosquitto.so.1放入系统变量中:
export LD_LIBRARY_PATH= 附件libmosquitto.so.1路径:$PATH
5.查看连接broker的客户端
broker默认的端口号1883,可以通过端口号来查看已连接的客户端
netstat -an | grep :1883
tcp 0 0 0.0.0.0:1883 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:1883 127.0.0.1:44618 ESTABLISHED
tcp 0 0 172.27.117.1:1883 172.27.117.1:45256 ESTABLISHED
tcp 0 0 172.27.117.1:1883 172.27.117.2:49612 ESTABLISHED
tcp 0 0 172.27.117.1:1883 172.27.117.2:49508 ESTABLISHED
可以看到连接broker的客户端为4个;
以上分别介绍了MQTT协议以及服务端mosquitto、客户端Eclipse paho的使用及源码介绍!