# Paho MQTT Android 源码分析 — MqttAndroidClient

客户端接口API

客户端有两种接口:

  1. IMqttClient

    提供阻塞方法,结束操作后返回调用方,是异步Client的使用到的较”薄“的一层设计,为早期版本的MQTT客户端使用。

  2. IMqttAsyncClient

    提供异步方法,调用方可通过获取返回值token的,使用waitForCompletion() 方式转变为阻塞同步方式

主要区别:1 为同步接口,2 为异步接口。

IMqttAsyncClient

异步接口,提供非阻塞式的方法,后台处理任务,以连接为例,连接到MQTT server是一个耗时操作,非阻塞方式在后台进行连接的时候,可以通知调用方,连接busy的状态。

非阻塞方式在时间驱动型程序和图形界面程序中较为多用,不会影响UI线程的绘制。

举例:连接,同步方式

// 方式1
IMqttToken conToken;
conToken = asyncClient.client.connect(conToken);
//... do some work...
conToken.waitForCompletion();

// 方式2
IMqttToken token;
token = asyncClient.method(parms).waitForCompletion();


连接,异步方式

IMqttToken conToken;
conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
    public void onSuccess(IMqttToken asyncActionToken) {
        log("Connected");
    }

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        log ("connect failed" +exception);
    }
});

异步回调中如果需要使用context上下文,可以通过异步方法传入,最终可在回调中,返回给调用方。

关键方法

方法名 描述
connect 创建连接
disconnect 断开连接
disconnectForcibly 强制断开连接(用于disconnect失败后)
isConnected 判断是否连接
getClientId 获取Client ID
getServerURI 获取server的URI
publish 消息发布
subscribe 消息订阅
unsubscribe 取消消息订阅
setCallback 设置异步回调,监听
接收的消息
连接状态
消息发送结果
getPendingDeliveryTokens 获取为发送的消息的token,需cleanSession false才有效
setManualAcks 是否手动返回ACK消息
messageArrivedComplete 消息已成功送达,触发发送ack消息给server
close 释放所有client的资源,client被close后无法重复使用

IMqttClient

同步阻塞方法

关键方法

与异步client相似,~~表示同上

方法 描述
connect 建立连接,无返回
connectWithResult 建立连接,返回token
disconnect 同上~~
disconnectForcibly ~~
subscribe ~~,支持wildcard的 topicFilter
subscribeWithResponse 有返回的订阅,wildcard
unsubscribe ~~
publish ~~
setCallback ~~
getTopic 获取用于publish的主题
isConnected ~~
getServerURI ~~
getPendingDeliveryTokens ~~
setManualAcks ~~
messageArrivedComplete ~~
close ~~

MqttAndroidClient

Android client的主要实现类,extends BroadcastReceiver,实现IMqttAsyncClient

通过 Android的service服务于 MQTT服务进行通信。提供了包含 一下方法的简单易用的MQTT 客户端:

connect
publish
subscribe
unsubscribe
disconnect

连接 connect

主要进行的操作:

  1. 在没有service时,创建mqttService
  2. 注册广播监听
  3. 有service,则直接doConnect,无service,在连接后,进行doConnect
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) {
    // 创建 MqttTokenAndroid
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);

    connectOptions = options;
    connectToken = token;

    /*
    * 实际是通过service进行真正连接的,这里创建并bind service,
    * 真正使用 service 需要等待 service 中的异步方法
    * onServiceConnected() 连接成功,
    * connection itself takes place in the onServiceConnected() method
    */
    if (mqttService == null) { // First time - must bind to the service
        Intent serviceStartIntent = new Intent();
        serviceStartIntent.setClassName(myContext, SERVICE_NAME);
        Object service = myContext.startService(serviceStartIntent);
        if (service == null) {
            IMqttActionListener listener = token.getActionCallback();
            if (listener != null) {
                listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME));
            }
        }

        // 使用bind 方式启动service,需要注意声明周期管理,
        // startService 的启动最后要调用 stopService
        myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE);

        // 注册广播监听
        if (!receiverRegistered) {
            registerReceiver(this);
        }
    } else {
        // 如已创建 service,则调用线程池进行连接,并注册广播监听
        pool.execute(new Runnable() {

            @Override
            public void run() {
                doConnect();

                //Register receiver to show shoulder tap.
                if (!receiverRegistered) {
                    registerReceiver(MqttAndroidClient.this);
                }
            }

        });
    }

    return token;
}

/**
 * ServiceConnection to process when we bind to our service
 */
private final class MyServiceConnection implements ServiceConnection {

    @Override
    public void onServiceConnected(ComponentName name, IBinder binder) {
        if (MqttServiceBinder.class.isAssignableFrom(binder.getClass())) {
            mqttService = ((MqttServiceBinder) binder).getService();
            bindedService = true;
            // now that we have the service available, we can actually
            // connect...
            doConnect();
        }
    }

    @Override
    public void onServiceDisconnected(ComponentName name) {
        mqttService = null;
    }
}

创建连接

  1. mqttService中获取Client句柄,
  2. 将token存入tokenMap
  3. 调用服务进行连接
/**
 * Actually do the mqtt connect operation
 */
private void doConnect() {
    // 从服务中获取client标识
    if (clientHandle == null) {
        clientHandle = mqttService.getClient(serverURI, clientId, myContext.getApplicationInfo().packageName, persistence);
    }
    // 配置服务是否trace
    mqttService.setTraceEnabled(traceEnabled);
    // 设置服务callback的 clientId
    mqttService.setTraceCallbackId(clientHandle);

    // 缓存token到 SparseArray
    String activityToken = storeToken(connectToken);
    try {
        // 调用服务建立连接,传入client标识和返回给客户端调用方的token缓存在tokenMap中的id
        mqttService.connect(clientHandle, connectOptions, activityToken);
    } catch (MqttException e) {
        IMqttActionListener listener = connectToken.getActionCallback();
        if (listener != null) {
            listener.onFailure(connectToken, e);
        }
    }
}

发布 publish

消息重发机制:在发送消息的时候,如果期间连接中断或client停止,消息会在满足所有以下条件且再次创建连接后,被已设定好的QoS被发送。

  • 用相同的ClientID 创建连接,
  • 之前和当前的连接均setCleanSession 为 false(不清楚缓存),
  • QoS > 0

关于Topic

  • 大小写敏感

  • 可以包含白字符

  • / 开头的Topic为独特Topic,注意通配符统配 e.g. /finance 不同于finance/finance 匹配"+/+" and "/+" 不匹配"+"

  • topic 长度限制 64k

  • topic 层数不限制

方式1:

public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean retained, Object userContext, IMqttActionListener callback) {

    // 根据 payload 构造MqttMessage
    MqttMessage message = new MqttMessage(payload);
    // 配置 QoS
    message.setQos(qos);
    // 配置是否在服务端进行保留
    message.setRetained(retained);
    // 创建token,注意区别于MqttTokenAndroid,继承于MqttTokenAndroid,拓展了MqttMessage字段
    MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);
    // 缓存在 tokenMap 中
    String activityToken = storeToken(token);
    // 调用Service 发布消息,获取service缓存的internalToken
    IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, payload, qos, retained, null, activityToken);
    // 设置代理,通过设置代理的方式,其实就是个set字段,将两个token关联到了一起,内部处理消息的关系
    token.setDelegate(internalToken);
    return token;
}

方式2:

public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) {
    MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(this, userContext, callback, message);
    String activityToken = storeToken(token);
    IMqttDeliveryToken internalToken = mqttService.publish(clientHandle, topic, message, null, activityToken);
    token.setDelegate(internalToken);
    return token;
}

省略了方式1的 MqttMessage的构造。

订阅 subscribe

注意:

  • setCallback 要在subscribe之前配置,否则可能漏掉消息
  • 如果setCleanSession(true) 订阅一直到以下任意一种情况
    • 断开连接
    • 取消订阅
  • 如果setCleanSession(false) 订阅一直到以下任意一种情况
    • 取消订阅
    • 下一次client连接时候 setCleanSession(true)
  • topic filter 是带有特殊字符的字符串,可以允许订阅多个topics
  • topic tree 通过 / 进行层级划分,
  • Topic的通配符(wild card)
    • # 用于匹配任意多层的topic,使用注意:
      • 匹配层数:0层到n层,finance/# 匹配 finance
      • #必须是最后一个字符 finance/#/closingprice 非法
      • #必须在一层topic中使用 finance#非法
    • + 用于匹配单一层的topic,
      • 匹配层数:1,finance/+ 不匹配 finance
      • 可以使用在最后和中间

机制:

  • cleanSession是 false的时候,在客户端断开连接后,MQTT服务端会帮客户端存储消息,当下次具有相同ClientID的客户端再次连接后,服务端会下发消息到客户端上

订阅单一Topic

public IMqttToken subscribe(String topic, int qos, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback, new String[]{topic});
    String activityToken = storeToken(token);
    // 通过Service 订阅消息
    mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
    return token;
}

订阅多个Topic

优势:优化比逐一订阅

@Override
public IMqttToken subscribe(String[] topic, int[] qos, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback, topic);
    String activityToken = storeToken(token);
    mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
    return token;
}

取消订阅 unsubscribe

取消订阅与订阅是相反的,取消订阅需要在服务端收到取消后,查询是否有match的订阅,然后移除。

取消订阅

public IMqttToken unsubscribe(String topic, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    String activityToken = storeToken(token);
    mqttService.unsubscribe(clientHandle, topic, null, activityToken);
    return token;
}

取消多个订阅

@Override
public IMqttToken unsubscribe(String[] topic, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    String activityToken = storeToken(token);
    mqttService.unsubscribe(clientHandle, topic, null, activityToken);
    return token;
}

断开连接 disconnect

注意:

  • 在真正断开前允许重要的工作先完成后再真正的断开连接。
  • 一定不能在MqttCallback回调方法中调用,因为client会等待callback执行完后

断线时候消息发送的机制:

  • 断开消息前要先等待 MqttCallback 回调中方法执行完成,例如:在QoS=2 的消息开始发送后,disconnect会阻止新消息继续接收要发送的消息和已确认发送但还没开始发送的消息将被缓存,当QoS=2的work完成或超时时间到达,client会断开连接。如果 cleanSession = false 下次也为false,则QoS=1、2的消息将再次发送

方式1:

public IMqttToken disconnect() {
    IMqttToken token = new MqttTokenAndroid(this, null, null);
    String activityToken = storeToken(token);
    // 调用service断开连接
    mqttService.disconnect(clientHandle, null, activityToken);
    return token;
}

方式2:带超时机制

public IMqttToken disconnect(long quiesceTimeout) {
    IMqttToken token = new MqttTokenAndroid(this, null, null);
    String activityToken = storeToken(token);
    mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken);
    return token;
}

方式3:带回调

public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) {
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    String activityToken = storeToken(token);
    mqttService.disconnect(clientHandle, quiesceTimeout, null, activityToken);
    return token;
}

其他API

获取挂起的消息token

在客户端stop的时候,可能还有未发送的message,这种情况下,可以通过 getPendingDeliveryTokens 方法在客户端重启后获取为发送消息(in-flight message)的token,从而追踪这些消息的状态。

替代方法:MqttCallback中的deliveryComplete 也可以获取到消息送达的状态。

注意:

  • cleanSession = true 将清理所有缓存,将不会有 未发送的token,所以必须是:cleanSession=false
public IMqttDeliveryToken[] getPendingDeliveryTokens() {
    // 从service中根据client标识获取
    return mqttService.getPendingDeliveryTokens(clientHandle);
}

辅助debug功能

设置MqttTraceHandler,和使能在service中trace的功能

// 设置traceCallback,将从Service广播的trace action 透传给调用方
public void setTraceCallback(MqttTraceHandler traceCallback) {
    this.traceCallback = traceCallback;
}

/**
 * turn tracing on and off
 */
public void setTraceEnabled(boolean traceEnabled) {
    this.traceEnabled = traceEnabled;
    if (mqttService != null) {
        mqttService.setTraceEnabled(traceEnabled);
    }
}

/**
* Process trace action - pass trace data back to the callback
*/
private void traceAction(Bundle data) {

    if (traceCallback != null) {
        String severity = data.getString(MqttServiceConstants.CALLBACK_TRACE_SEVERITY);
        String message = data.getString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE);
        String tag = data.getString(MqttServiceConstants.CALLBACK_TRACE_TAG);
        if (MqttServiceConstants.TRACE_DEBUG.equals(severity)) {
            traceCallback.traceDebug(tag, message);
        } else if (MqttServiceConstants.TRACE_ERROR.equals(severity)) {
            traceCallback.traceError(tag, message);
        } else {
            Exception e = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
            traceCallback.traceException(tag, message, e);
        }
    }
}

MqttTraceCallback 是最简单的 MqttTraceHandler 的实现,直接输出到Android的Logcat

接收从MqttService返回

使用 BroadcastReceiver 的方式接收从 MqttService 的返回消息:

  • 由于Android机制,必须public
  • 但不应显示调用此方法
public void onReceive(Context context, Intent intent) {
    Bundle data = intent.getExtras();

    String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);

    if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) {
        return;
    }

    String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);

    if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
        connectAction(data);
    } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
        connectExtendedAction(data);
    } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
        messageArrivedAction(data);
    } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
        subscribeAction(data);
    } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
        unSubscribeAction(data);
    } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
        sendAction(data);
    } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
        messageDeliveredAction(data);
    } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) {
        connectionLostAction(data);
    } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
        disconnected(data);
    } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
        traceAction(data);
    } else {
        mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
    }

}

具体以sub的消息接收为参考(messageArrivedAction):

private void messageArrivedAction(Bundle data) {
    if (callback != null) {
        // 获取消息的 messageId destinationName
        String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
        String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);

        // 获取Parcelable的消息
        ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
        try {
            if (messageAck == Ack.AUTO_ACK) {
                // 调用callback 方法中的 messageArrived
                callback.messageArrived(destinationName, message);
                // 自动acknowledge的话调用 service的接口
                mqttService.acknowledgeMessageArrival(clientHandle, messageId);
            } else {
                // 将messageId设置到message中
                message.messageId = messageId;
                // 调用callback 方法
                callback.messageArrived(destinationName, message);
            }

            // let the service discard the saved message details
        } catch (Exception e) {
            mqttService.traceError(MqttService.TAG, "messageArrivedAction failed: " + e);
        }
    }
}

移除token

从SparseArray的tokenMap中移除token:

private synchronized IMqttToken removeMqttToken(Bundle data) {

    String activityToken = data.getString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN);
    if (activityToken != null) {
        int tokenNumber = Integer.parseInt(activityToken);
        IMqttToken token = tokenMap.get(tokenNumber);
        tokenMap.delete(tokenNumber);
        return token;
    }
    return null;
}

注意:

  • 同步方法,加锁了
  • 调用的地方,在收到以下MqttService的广播后:
    • 收到CONNECT_ACTION
    • 收到DISCONNECT_ACTION
    • 收到SUBSCRIBE_ACTION:订阅
    • 收到UNSUBSCRIBE_ACTION:取消订阅
    • 收到MESSAGE_DELIVERED_ACTION :消息送达
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容