MQTT基于安卓的使用

前言

上一篇提到了MQTT的通用方式,由于智能家居TV的项目网络波动频繁,通用的方式已经无法满足需求,经常会出现重复订阅导致收到多条消息,那就只能另辟蹊径了,最终找到了梦寐以求的MqttAndroidClient。

1.集成

集成方式和上一篇的MQTT简介和使用要新增配置,build.gradle新增

implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

manifest文件里面需要注册服务

<!-- Mqtt Service -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />

2.MqttAndroidClient重要源码解析

MqttAndroidClient是专门对MQTTClient的再封装拓展类,包含了订阅、连接以及多线程的处理,直接看MqttAndroidClient对于连接的封装源码,非关键代码已省略

    public IMqttToken connect(MqttConnectOptions options, Object userContext,
            IMqttActionListener callback) throws MqttException {
        if (mqttService == null) { 
        }
        else {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    doConnect();
                }
            });
        }
        return token;
    }

doConnect()连接操作放在子线程,有效避免网络波动连接时间过长阻塞主线程

private void doConnect() {
        ...
            mqttService.connect(clientHandle, connectOptions, null,
                    activityToken);
        ...
    }

阿里专门针对安卓客户端写了一个MQTTService,方便统一管理,除了连接操作,重连,断开连接都是在MQTTService中完成。

public void connect(String clientHandle, MqttConnectOptions connectOptions,
      String invocationContext, String activityToken)throws MqttSecurityException, MqttException {
        MqttConnection client = getConnection(clientHandle);
        client.connect(connectOptions, null, activityToken);
  }
public void connect(MqttConnectOptions options, String invocationContext,
            String activityToken) {
            ...
            if (myClient != null) {
                if (isConnecting ) {
                }else if(!disconnected){
                }
                else {                  
                    service.traceDebug(TAG, "myClient != null and the client is not connected");
                    service.traceDebug(TAG,"Do Real connect!");
                    setConnectingState(true);
                    myClient.connect(connectOptions, invocationContext, listener);
                }
            }
            ...
    }
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {
        final String methodName = "connect";
        if (comms.isConnected()) {
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
        }
        if (comms.isConnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
        }
        if (comms.isDisconnecting()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
        }
        if (comms.isClosed()) {
            throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
        }
        ...
        connectActionListener.connect();
        return userToken;
    }

源码自身对于isConnected、isConnecting、isDisconnecting、isClosed做了异常处理,避免正在连接或者断开连接时连接造成重复连接。后面的源码就没贴的必要了,就是开启一个连接线程。

3.MqttAndroidClient的使用

一行代码完成MQTT的连接

mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);

当然,这样是远远不够的,在实际应用中发现有一个问题,断网一段时间后重连网络MQTT不会自动重连,所以还得我们来做手动优化。思路很简单,在断开的节点开启重连线程,连接成功后关闭重连线程。以下贴的是完整代码,主要注意重连机制和订阅前记得取消订阅再订阅。(部分包含自己的代码,可以忽略,注释很详细)

public class MQTTManager {

    private Context mContext;
    private MqttAndroidClient mqttAndroidClient;
    private String clientId;//自定义

    private MqttConnectOptions mqttConnectOptions;

    private ScheduledExecutorService reconnectPool;//重连线程池

    public MQTTManager(Context mContext) {
        this.mContext = mContext;
    }

    public void buildClient() {
        closeMQTT();//先关闭上一个连接

        buildMQTTClient();
    }

    private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            TVLog.i("connect-"+"onSuccess");
            closeReconnectTask();
            subscribeToTopic();
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            //connect-onFailure-MqttException (0) - java.net.UnknownHostException
            TVLog.i("connect-"+ "onFailure-"+exception);
            startReconnectTask();
        }
    };

    private MqttCallback mqttCallback = new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            //close-connectionLost-等待来自服务器的响应时超时 (32000)
            //close-connectionLost-已断开连接 (32109)
            TVLog.i("close-"+"connectionLost-"+cause);
            if (cause != null) {//null表示被关闭
                startReconnectTask();
            }
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            String body = new String(message.getPayload());
            TVLog.i("messageArrived-"+message.getId()+"-"+body);
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            try {
                TVLog.i("deliveryComplete-"+token.getMessage().toString());
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    };

    private void buildMQTTClient(){
        mqttAndroidClient = new MqttAndroidClient(mContext, MQTTCons.Broker, clientId);
        mqttAndroidClient.setCallback(mqttCallback);

        mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(20);
        mqttConnectOptions.setCleanSession(true);
        try {
            mqttConnectOptions.setUserName("Signature|" + MQTTCons.AcessKey + "|" + MQTTCons.instanceId);
            mqttConnectOptions.setPassword(MacSignature.macAndSignature(clientId, MQTTCons.SecretKey).toCharArray());
        } catch (Exception e) {
        }
        doClientConnection();
    }

    private synchronized void startReconnectTask(){
        if (reconnectPool != null)return;
        reconnectPool = Executors.newScheduledThreadPool(1);
        reconnectPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                doClientConnection();
            }
        } , 0 , 5*1000 , TimeUnit.MILLISECONDS);
    }

    private synchronized void closeReconnectTask(){
        if (reconnectPool != null) {
            reconnectPool.shutdownNow();
            reconnectPool = null;
        }
    }

    /**
     * 连接MQTT服务器
     */
    private synchronized void doClientConnection() {
        if (!mqttAndroidClient.isConnected()) {
            try {
                mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);
                TVLog.d("mqttAndroidClient-connecting-"+mqttAndroidClient.getClientId());
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    private void subscribeToTopic() {//订阅之前会取消订阅,避免重连导致重复订阅
        try {
            String registerTopic = "";//自定义
            String controlTopic = "";//自定义
            String[] topicFilter=new String[]{registerTopic , controlTopic };
            int[] qos={0,0};
            mqttAndroidClient.unsubscribe(topicFilter, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    TVLog.i("unsubscribe-"+"success");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    TVLog.i("unsubscribe-"+"failed-"+exception);
                }
            });
            mqttAndroidClient.subscribe(topicFilter, qos, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {//订阅成功
                    TVLog.i("subscribe-"+"success");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//                    startReconnectTask();
                    TVLog.i("subscribe-"+"failed-"+exception);
                }
            });

        } catch (MqttException ex) {
        }
    }

    public void sendMQTT(String topicSep, String msg) {
        try {
            if (mqttAndroidClient == null)return;
            MqttMessage message = new MqttMessage();
            message.setPayload(msg.getBytes());
            String topic = "";//自定义
            mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
//                    TVLog.i("sendMQTT-"+"success:" + msg);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//                    startReconnectTask();
                    TVLog.i("sendMQTT-"+"failed:" + msg);
                }
            });
        } catch (MqttException e) {
        }
    }

    public void closeMQTT(){
        closeReconnectTask();
        if (mqttAndroidClient != null){
            try {
                mqttAndroidClient.unregisterResources();
                mqttAndroidClient.disconnect();
                TVLog.i("closeMQTT-"+mqttAndroidClient.getClientId());
                mqttAndroidClient = null;
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

}

调用方式如下,想要做线程安全的单例的可以自己封装

if (mqttManager == null)
            mqttManager = new MQTTManager(getApplicationContext());
mqttManager.buildClientId();

结语

基于安卓部分也算是完结了,里面也夹杂着一些源码解释,后续会写更多关于源码的解析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,105评论 1 32
  • 一、简历准备 1、个人技能 (1)自定义控件、UI设计、常用动画特效 自定义控件 ①为什么要自定义控件? Andr...
    lucas777阅读 5,213评论 2 54
  • 很久没有写东西了,半年搞1个半app,2个ipad项目,人已疯。。。。 今天在重构代码,总结一下:MQTT的使用和...
    今年我25阅读 58,721评论 59 141
  • 简介 MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)...
    殖民_FE阅读 4,425评论 1 6
  • 蝉鸣恻恻催夜长, 莲卧荷塘凭波涨。 扇下轻风不知凉, 玲珑纱帐望梅香。
    姜海清阅读 1,085评论 30 40