MQTT客户端实现
MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做介绍,而是主要分析下一个Paho MQTT客户端的代码实现。
消息基类
所有消息的基类就是MqttWireMessage,核心的方法无非是封包/拆包,创建包头,读取playload等等。各个消息子类如MqttSubscribe等,继承自MqttWireMessage,需要实现getMessageInfo、getVariableHeader(构造包头),getPayload(构造body)等方法
重要接口
IMqttAsyncClient 声明了与MQTT Server交互时的重要方法,如connect、publish、subscribe等,这些方法是异步的,有两种调用方式
//方式一
IMqttToken conToken;
conToken = asyncClient.client.connect(conToken);
... do some work...
conToken.waitForCompletion();
//或者这样就可以把一个异步任务转化成同步调用
IMqttToken token;
token = asyncClient.method(parms).waitForCompletion();
//方式二,传入一个callback MqttAsyncActionListener,实现onSuccess及onFailure方法
IMqttToken conToken;
conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
log("Connected");
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log ("connect failed" +exception);
}
});
我们可以把此处的IMqttToken理解为对异步的任务信息、操作的封装,里面包含了getTopics、setActionCallback、waitForCompletion、isComplete、getMessageId、getResponse等方法
MqttAsyncClient是对IMqttAsyncClient接口的具体实现,里面包含两个重要的类:ClientComms,用来和服务器交互的类,封装了底层的网络调用;MqttClientPersistence,按照协议的QoS规定用来做消息的持久化。
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
final String methodName = "connect";
if (comms.isConnected()) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
...
//设置网络模块,NetworkModule是一个接口,包含start、stop、getInputStream、getOutputStream四个方法,
//有TCPNetworkModule、SSLNetworkModule、LocalNetworkModule等不同的实现
comms.setNetworkModules(createNetworkModules(serverURI, options));
// Insert our own callback to iterate through the URIs till the connect succeeds
//ConnectActionListener实现了IMqttActionListener接口,并把失败重试等逻辑都封装在该类里面;userToken是返回给上层调用者使用的对异步任务操作的封装类
MqttToken userToken = new MqttToken(getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
comms.setNetworkModuleIndex(0);
connectActionListener.connect();
return userToken;
}
网络管理类
ClientComms是网络层重要的管理类,包含几个主要的类:
- NetworkModule 底层网络实现 CommsSender 和 CommsReceiver里的输入输出流来自于NetworkModule层的Socket
- CommsReceiver 接收消息 起一个线程,通过MqttInputStream解析出消息
- CommsSender 发送消息 起一个线程消费ClientState里的发送队列,通过MqttOutputStream往外写消息
- ClientState 管理消息的发送,里面有Vector pendingMessages pendingFlows 待发送的消息,结合不同的Qos进行处理
- CommsCallback 收到消息后的回调处理,是Receiver和外部API调用之间的桥梁
ClientComms连接服务器时,会在一个异步线程ConnectBG中执行,包括启动网络模块,初始化CommsReceiver、CommsSender等
// Connect to the server at the network level e.g. TCP socket and then
// start the background processing threads before sending the connect
// packet.
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId());
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId());
callback.start("MQTT Call: "+getClient().getClientId());
internalSend(conPacket, conToken);
通过internalSend方法,将要发送的消息插入到ClientState中的pendingMessages队列中,CommsSender会去消费这个队列,把消息取出来,写到MqttOutputStream里,并根据Qos设置做一些持久化操作。写成功后会有回调通知外部API调用。其他消息的发送也是这个流程。
//CommsSender中的run方法
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
}
}
}
CommsReceiver负责从InputStream读出消息,通过ClientState进行分发
public void run() {
final String methodName = "run";
MqttToken token = null;
while (running && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME,methodName,"852");
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
clientState.notifyReceivedAck((MqttAck)message);
}
} else {
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
} else {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
}
catch (MqttException ex) {
running = false;
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex);
}
catch (IOException ioe) {
//@TRACE 853=Stopping due to IOException
log.fine(CLASS_NAME,methodName,"853");
running = false;
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
}
}
finally {
receiving = false;
}
}
而在ClientState中,通过notifyReceivedMsg方法接收到消息,根据不同的Qos做持久化操作,并最终调用了CommsCallback的messageArrived方法,将消息加入到一个messageQueue队列中。
CommsCallback里也起了一个线程消费这个队列,将取出的消息在handleMessage方法中通过MqttCallback接口回调出去。上文提到的发送完毕时,会将token插入到CommsCallback的completeQueue方法里进行消费,也是在这个run方法里
//CommsCallback里的run方法
while (running) {
try {
if (running) {
// Check for deliveryComplete callbacks...
MqttToken token = null;
synchronized (completeQueue) {
if (!completeQueue.isEmpty()) {
// First call the delivery arrived callback if needed
token = (MqttToken) completeQueue.elementAt(0);
completeQueue.removeElementAt(0);
}
}
if (null != token) {
handleActionComplete(token);
}
// Check for messageArrived callbacks...
MqttPublish message = null;
synchronized (messageQueue) {
if (!messageQueue.isEmpty()) {
message = (MqttPublish) messageQueue.elementAt(0);
messageQueue.removeElementAt(0);
}
}
if (null != message) {
handleMessage(message);
}
}
}
//处理发送完成的方法
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
// Unblock any waiters and if pending complete now set completed
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
// If a callback is registered and delivery has finished
// call delivery complete callback.
if ( mqttCallback != null
&& token instanceof MqttDeliveryToken
&& token.isComplete()) {
mqttCallback.deliveryComplete((MqttDeliveryToken) token);
}
// Now call async action completion callbacks
fireActionEvent(token);
}
// Set notified so we don't tell the user again about this action.
if ( token.isComplete() ){
if ( token instanceof MqttDeliveryToken || token.getActionCallback() instanceof IMqttActionListener ) {
token.internalTok.setNotified(true);
}
}
if (token.isComplete()) {
clientState.notifyComplete(token);
}
}
}
使用介绍
以同步调用为例
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
// Construct the connection options object that contains connection parameters
// such as cleanSession and LWT
conOpt = new MqttConnectOptions();
conOpt.setCleanSession(clean);
if(password != null ) {
conOpt.setPassword(this.password.toCharArray());
}
if(userName != null) {
conOpt.setUserName(this.userName);
}
// Construct an MQTT blocking mode client
client = new MqttClient(this.brokerUrl,clientId, dataStore);
// Set this wrapper as the callback handler
client.setCallback(this);
} catch (MqttException e) {
e.printStackTrace();
log("Unable to set up client: "+e.toString());
System.exit(1);
}
设置各种连接参数,如用户名,密码,持久化存储路径等,并设置MqttCallback回调函数。
public interface MqttCallback {
//连接断开时的回调
public void connectionLost(Throwable cause);
//收到下推消息时的回调
public void messageArrived(String topic, MqttMessage message) throws Exception;
//消息发送成功时的回调
public void deliveryComplete(IMqttDeliveryToken token);
}
想要发布一个消息时,可以这样
public void publish(String topicName, int qos, byte[] payload) throws MqttException {
client.connect(conOpt);
String time = new Timestamp(System.currentTimeMillis()).toString();
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
client.publish(topicName, message);
client.disconnect();
}
这个publish方法是个同步方法,里面的实现其实是代理给异步client+wait阻塞实现的
public void publish(String topic, MqttMessage message) throws MqttException,
MqttPersistenceException {
aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
}
publish调用后将消息插入到ClientState的队列中,通过CommsSender线程中发送给服务器,发送完成时(或收到ack后)会回调MqttCallback接口中的deliveryComplete方法。用户还可以设置IMqttActionListener接口获取发送是成功还是失败的回调。如果收到一个新的消息,最终通过MqttCallback中的messageArrived回调给用户。