客户端接口API
客户端有两种接口:
-
IMqttClient
提供阻塞方法,结束操作后返回调用方,是异步Client的使用到的较”薄“的一层设计,为早期版本的MQTT客户端使用。
-
IMqttAsyncClient
提供异步方法,调用方可通过获取返回值token的,使用
waitForCompletion()
方式转变为阻塞同步方式
主要区别:1 为同步接口,2 为异步接口。
IMqttAsyncClient
异步接口,提供非阻塞式的方法,后台处理任务,以连接为例,连接到MQTT server是一个耗时操作,非阻塞方式在后台进行连接的时候,可以通知调用方,连接busy的状态。
非阻塞方式在时间驱动型程序和图形界面程序中较为多用,不会影响UI线程的绘制。
举例:连接,同步方式
// 方式1
IMqttToken conToken;
conToken = asyncClient.client.connect(conToken);
//... do some work...
conToken.waitForCompletion();
// 方式2
IMqttToken token;
token = asyncClient.method(parms).waitForCompletion();
连接,异步方式
IMqttToken conToken;
conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
log("Connected");
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log ("connect failed" +exception);
}
});
异步回调中如果需要使用context上下文,可以通过异步方法传入,最终可在回调中,返回给调用方。
关键方法
方法名 | 描述 |
---|---|
connect | 创建连接 |
disconnect | 断开连接 |
disconnectForcibly | 强制断开连接(用于disconnect失败后) |
isConnected | 判断是否连接 |
getClientId | 获取Client ID |
getServerURI | 获取server的URI |
publish | 消息发布 |
subscribe | 消息订阅 |
unsubscribe | 取消消息订阅 |
setCallback | 设置异步回调,监听 接收的消息 连接状态 消息发送结果 |
getPendingDeliveryTokens | 获取为发送的消息的token,需cleanSession false才有效 |
setManualAcks | 是否手动返回ACK消息 |
messageArrivedComplete | 消息已成功送达,触发发送ack消息给server |
close | 释放所有client的资源,client被close后无法重复使用 |
IMqttClient
同步阻塞方法
关键方法
与异步client相似,~~
表示同上
方法 | 描述 |
---|---|
connect | 建立连接,无返回 |
connectWithResult | 建立连接,返回token |
disconnect | 同上~~ |
disconnectForcibly | ~~ |
subscribe | ~~,支持wildcard的 topicFilter |
subscribeWithResponse | 有返回的订阅,wildcard |
unsubscribe | ~~ |
publish | ~~ |
setCallback | ~~ |
getTopic | 获取用于publish的主题 |
isConnected | ~~ |
getServerURI | ~~ |
getPendingDeliveryTokens | ~~ |
setManualAcks | ~~ |
messageArrivedComplete | ~~ |
close | ~~ |
MqttAndroidClient
Android client的主要实现类,extends BroadcastReceiver
,实现IMqttAsyncClient
通过 Android的service服务于 MQTT服务进行通信。提供了包含 一下方法的简单易用的MQTT 客户端:
connect
publish
subscribe
unsubscribe
disconnect
连接 connect
主要进行的操作:
- 在没有service时,创建
mqttService
- 注册广播监听
- 有service,则直接doConnect,无service,在连接后,进行doConnect
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) {
// 创建 MqttTokenAndroid
IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
connectOptions = options;
connectToken = token;
/*
* 实际是通过service进行真正连接的,这里创建并bind service,
* 真正使用 service 需要等待 service 中的异步方法
* onServiceConnected() 连接成功,
* connection itself takes place in the onServiceConnected() method
*/
if (mqttService == null) { // First time - must bind to the service
Intent serviceStartIntent = new Intent();
serviceStartIntent.setClassName(myContext, SERVICE_NAME);
Object service = myContext.startService(serviceStartIntent);
if (service == null) {
IMqttActionListener listener = token.getActionCallback();
if (listener != null) {
listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME));
}
}
// 使用bind 方式启动service,需要注意声明周期管理,
// startService 的启动最后要调用 stopService
myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE);
// 注册广播监听
if (!receiverRegistered) {
registerReceiver(this);
}
} else {
// 如已创建 service,则调用线程池进行连接,并注册广播监听
pool.execute(new Runnable() {
@Override
public void run() {
doConnect();
//Register receiver to show shoulder tap.
if (!receiverRegistered) {
registerReceiver(MqttAndroidClient.this);
}
}
});
}
return token;
}
/**
* ServiceConnection to process when we bind to our service
*/
private final class MyServiceConnection implements ServiceConnection {
@Override
public void onServiceConnected(ComponentName name, IBinder binder) {
if (MqttServiceBinder.class.isAssignableFrom(binder.getClass())) {
mqttService = ((MqttServiceBinder) binder).getService();
bindedService = true;
// now that we have the service available, we can actually
// connect...
doConnect();
}
}
@Override
public void onServiceDisconnected(ComponentName name) {
mqttService = null;
}
}
创建连接
- 从
mqttService
中获取Client句柄, - 将token存入tokenMap
- 调用服务进行连接
/**
* Actually do the mqtt connect operation
*/
private void doConnect() {
// 从服务中获取client标识
if (clientHandle == null) {
clientHandle = mqttService.getClient(serverURI, clientId, myContext.getApplicationInfo().packageName, persistence);
}
// 配置服务是否trace
mqttService.setTraceEnabled(traceEnabled);
// 设置服务callback的 clientId
mqttService.setTraceCallbackId(clientHandle);
// 缓存token到 SparseArray
String activityToken = storeToken(connectToken);
try {
// 调用服务建立连接,传入client标识和返回给客户端调用方的token缓存在tokenMap中的id
mqttService.connect(clientHandle, connectOptions, activityToken);
} catch (MqttException e) {
IMqttActionListener listener = connectToken.getActionCallback();
if (listener != null) {
listener.onFailure(connectToken, e);
}
}
}
发布 publish
消息重发机制:在发送消息的时候,如果期间连接中断或client停止,消息会在满足所有以下条件且再次创建连接后,被已设定好的QoS被发送。
- 用相同的ClientID 创建连接,
- 之前和当前的连接均setCleanSession 为 false(不清楚缓存),
- QoS > 0
关于Topic
大小写敏感
可以包含白字符
以
/
开头的Topic为独特Topic,注意通配符统配 e.g./finance
不同于finance
且/finance
匹配"+/+" and "/+"
不匹配"+"
topic 长度限制 64k
topic 层数不限制
方式1:
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, IMqttActionListener callback) {
// 根据 payload 构造MqttMessage
MqttMessage message = new MqttMessage(payload);
// 配置 QoS
message.setQos(qos);
// 配置是否在服务端进行保留
message.setRetained(retained);
// 创建token,注意区别于MqttTokenAndroid,继承于MqttTokenAndroid,拓展了MqttMessage字段
MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);
// 缓存在 tokenMap 中
String activityToken = storeToken(token);
// 调用Service 发布消息,获取service缓存的internalToken
IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, payload, qos, retained, null, activityToken);
// 设置代理,通过设置代理的方式,其实就是个set字段,将两个token关联到了一起,内部处理消息的关系
token.setDelegate(internalToken);
return token;
}
方式2:
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) {
MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);
String activityToken = storeToken(token);
IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, message, null, activityToken);
token.setDelegate(internalToken);
return token;
}
省略了方式1的 MqttMessage
的构造。
订阅 subscribe
注意:
- setCallback 要在subscribe之前配置,否则可能漏掉消息
- 如果
setCleanSession(true)
订阅一直到以下任意一种情况- 断开连接
- 取消订阅
- 如果
setCleanSession(false)
订阅一直到以下任意一种情况- 取消订阅
- 下一次client连接时候
setCleanSession(true)
-
topic filter
是带有特殊字符的字符串,可以允许订阅多个topics -
topic tree
通过/
进行层级划分, - Topic的通配符(wild card)
-
#
用于匹配任意多层的topic,使用注意:- 匹配层数:0层到n层,
finance/#
匹配finance
-
#
必须是最后一个字符finance/#/closingprice
非法 -
#
必须在一层topic中使用finance#
非法
- 匹配层数:0层到n层,
-
+
用于匹配单一层的topic,- 匹配层数:1,
finance/+
不匹配finance
- 可以使用在最后和中间
- 匹配层数:1,
-
机制:
- cleanSession是 false的时候,在客户端断开连接后,MQTT服务端会帮客户端存储消息,当下次具有相同ClientID的客户端再次连接后,服务端会下发消息到客户端上
订阅单一Topic
public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback) {
IMqttToken token = new MqttTokenAndroid(this, userContext, callback, new String[]{topic});
String activityToken = storeToken(token);
// 通过Service 订阅消息
mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
return token;
}
订阅多个Topic
优势:优化比逐一订阅
@Override
public IMqttToken subscribe(String[] topic, int[] qos, Object userContext, IMqttActionListener callback) {
IMqttToken token = new MqttTokenAndroid(this, userContext, callback, topic);
String activityToken = storeToken(token);
mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
return token;
}
取消订阅 unsubscribe
取消订阅与订阅是相反的,取消订阅需要在服务端收到取消后,查询是否有match的订阅,然后移除。
取消订阅
public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) {
IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
String activityToken = storeToken(token);
mqttService.unsubscribe(clientHandle, topic, null, activityToken);
return token;
}
取消多个订阅
@Override
public IMqttToken unsubscribe(String[] topic, Object userContext, IMqttActionListener callback) {
IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
String activityToken = storeToken(token);
mqttService.unsubscribe(clientHandle, topic, null, activityToken);
return token;
}
断开连接 disconnect
注意:
- 在真正断开前允许重要的工作先完成后再真正的断开连接。
- 一定不能在
MqttCallback
回调方法中调用,因为client会等待callback执行完后
断线时候消息发送的机制:
- 断开消息前要先等待
MqttCallback
回调中方法执行完成,例如:在QoS=2 的消息开始发送后,disconnect会阻止新消息继续接收要发送的消息和已确认发送但还没开始发送的消息将被缓存,当QoS=2的work完成或超时时间到达,client会断开连接。如果cleanSession = false
下次也为false,则QoS=1、2的消息将再次发送
方式1:
public IMqttToken disconnect() {
IMqttToken token = new MqttTokenAndroid(this, null, null);
String activityToken = storeToken(token);
// 调用service断开连接
mqttService.disconnect(clientHandle, null, activityToken);
return token;
}
方式2:带超时机制
public IMqttToken disconnect(long quiesceTimeout) {
IMqttToken token = new MqttTokenAndroid(this, null, null);
String activityToken = storeToken(token);
mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken);
return token;
}
方式3:带回调
public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) {
IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
String activityToken = storeToken(token);
mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken);
return token;
}
其他API
获取挂起的消息token
在客户端stop的时候,可能还有未发送的message,这种情况下,可以通过 getPendingDeliveryTokens
方法在客户端重启后获取为发送消息(in-flight message)的token,从而追踪这些消息的状态。
替代方法:MqttCallback中的deliveryComplete 也可以获取到消息送达的状态。
注意:
- cleanSession = true 将清理所有缓存,将不会有 未发送的token,所以必须是:cleanSession=false
public IMqttDeliveryToken[] getPendingDeliveryTokens() {
// 从service中根据client标识获取
return mqttService.getPendingDeliveryTokens(clientHandle);
}
辅助debug功能
设置MqttTraceHandler
,和使能在service中trace的功能
// 设置traceCallback,将从Service广播的trace action 透传给调用方
public void setTraceCallback(MqttTraceHandler traceCallback) {
this.traceCallback = traceCallback;
}
/**
* turn tracing on and off
*/
public void setTraceEnabled(boolean traceEnabled) {
this.traceEnabled = traceEnabled;
if (mqttService != null) {
mqttService.setTraceEnabled(traceEnabled);
}
}
/**
* Process trace action - pass trace data back to the callback
*/
private void traceAction(Bundle data) {
if (traceCallback != null) {
String severity = data.getString(MqttServiceConstants.CALLBACK_TRACE_SEVERITY);
String message = data.getString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE);
String tag = data.getString(MqttServiceConstants.CALLBACK_TRACE_TAG);
if (MqttServiceConstants.TRACE_DEBUG.equals(severity)) {
traceCallback.traceDebug(tag, message);
} else if (MqttServiceConstants.TRACE_ERROR.equals(severity)) {
traceCallback.traceError(tag, message);
} else {
Exception e = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
traceCallback.traceException(tag, message, e);
}
}
}
MqttTraceCallback
是最简单的 MqttTraceHandler
的实现,直接输出到Android的Logcat
接收从MqttService返回
使用 BroadcastReceiver
的方式接收从 MqttService
的返回消息:
- 由于Android机制,必须public
- 但不应显示调用此方法
public void onReceive(Context context, Intent intent) {
Bundle data = intent.getExtras();
String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);
if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) {
return;
}
String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);
if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
connectAction(data);
} else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
connectExtendedAction(data);
} else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
messageArrivedAction(data);
} else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
subscribeAction(data);
} else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
unSubscribeAction(data);
} else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
sendAction(data);
} else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
messageDeliveredAction(data);
} else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) {
connectionLostAction(data);
} else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
disconnected(data);
} else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
traceAction(data);
} else {
mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
}
}
具体以sub的消息接收为参考(messageArrivedAction):
private void messageArrivedAction(Bundle data) {
if (callback != null) {
// 获取消息的 messageId destinationName
String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);
// 获取Parcelable的消息
ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
try {
if (messageAck == Ack.AUTO_ACK) {
// 调用callback 方法中的 messageArrived
callback.messageArrived(destinationName, message);
// 自动acknowledge的话调用 service的接口
mqttService.acknowledgeMessageArrival(clientHandle, messageId);
} else {
// 将messageId设置到message中
message.messageId = messageId;
// 调用callback 方法
callback.messageArrived(destinationName, message);
}
// let the service discard the saved message details
} catch (Exception e) {
mqttService.traceError(MqttService.TAG, "messageArrivedAction failed: " + e);
}
}
}
移除token
从SparseArray的tokenMap中移除token:
private synchronized IMqttToken removeMqttToken(Bundle data) {
String activityToken = data.getString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN);
if (activityToken != null) {
int tokenNumber = Integer.parseInt(activityToken);
IMqttToken token = tokenMap.get(tokenNumber);
tokenMap.delete(tokenNumber);
return token;
}
return null;
}
注意:
- 同步方法,加锁了
- 调用的地方,在收到以下MqttService的广播后:
- 收到CONNECT_ACTION
- 收到DISCONNECT_ACTION
- 收到SUBSCRIBE_ACTION:订阅
- 收到UNSUBSCRIBE_ACTION:取消订阅
- 收到MESSAGE_DELIVERED_ACTION :消息送达