Eclipse Paho 实现Android推送

Eclipse Paho:是Eclipse提供的一个访问MQTT服务器的一种开源客户端库。

MQTT Client 对照表

Eclipse目前提供十种不同语言平台的客户端类库,
对于Java平台而言和MQTT服务器交互的开源框架还有很多, 例如:
Eclipse Paho Java、 Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等等...
但是, 根据GIthub上使用次数来讲Eclipse Paho无疑是主流, 就个人使用而已, Eclipse Paho集成非常方便、简单。
对MQTT协议不是很了解的可以看一下
Eclipse Paho 官网
Paho.client.mqttv3 在线API

注意:

QoS:服务质量,其作用是当网络过载或拥塞时,QoS 能确保重要业务量不受延迟,同时保证网络的高效运行

  • 服务质量0 - 表示一条消息最多应该发送一次(零次或一次)。该消息不会持久保存到磁盘,并且不会通过网络进行确认。这种QoS是最快的,但只能用于无价值的消息
  • 服务质量1 - 表示一条消息应至少传递一次(一次或多次)。该消息只能在持久存在的情况下才能被安全地传递,因此应用程序必须提供一种持久化方法。如果未指定持久性机制,则在发生客户端故障时不会传递消息。该消息将通过网络得到确认。这是默认的QoS。
  • 服务质量2 - 表示应该传递一次消息。该消息将被保存到磁盘,并且将受到整个网络的两阶段确认。该消息只能在持久存在的情况下才能被安全地传递,因此应用程序必须提供一种持久化方法。如果未指定持久性机制,则在发生客户端故障时不会传递消息。

如果未配置持久性,则在发生网络或服务器问题时,仍会传送QoS 1和2消息,因为客户端将在内存中保持状态。如果MQTT客户端关闭或失败,并且未配置持久性,则由于客户端状态将丢失,因此无法保持QoS 1和2消息的传输。

publish: 向服务器上的主题发布消息, 主要用于即时通讯(IoT、 聊天...)

subscribe: 订阅主题, 服务器通过你订阅的主题向你发布消息, 这样就能实现服务器向APP推送信息了

ClientId: 客户端ID应该是唯一的, 多个用户同时使用同一个ID创建连接会不断挤线互踢导致连接不断中断, 服务器也是通过ID来区分不同用户的操作


开始使用Eclipse Paho



1. 配置maven库


repositories {

...

    maven {

        url "https://repo.eclipse.org/content/repositories/paho-snapshots/"

    }

...

}



2.添加依赖

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'  // MQTT Eclipse paho



3.添加权限

<uses-permission android:name="android.permission.WAKE_LOCK"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.INTERNET"/>



4.编写逻辑代码

4.1新建一个MqttManager类, 用于MQTT操作的管理

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

/**
 * @author Ai(陈祥林)
 * @date 2018/1/3  10:37
 * @email Webb@starcc.cc
 */
public class MqttManager {

    private static MqttManager mInstance = null;
    /**
     * Mqtt回调
     */
    private MqttCallback mCallback;
    /**
     * Mqtt客户端
     */
    private static MqttClient client;
    /**
     * Mqtt连接选项
     */
    private MqttConnectOptions conOpt;

    private MqttManager() {
        mCallback = new MqttCallbackBus();
    }

    public static MqttManager getInstance() {
        if (null == mInstance) {
            synchronized (MqttManager.class) {
                if (mInstance == null) {
                    mInstance = new MqttManager();
                }
            }
        }
        return mInstance;
    }

    /**
     * 释放单例, 及其所引用的资源
     */
    public static void release() {
        try {
            if (mInstance != null) {
                disConnect();
                mInstance = null;
            }
        } catch (Exception e) {
            Log.e("MqttManager", "release : " + e.toString());
        }
    }

    /**
     * 创建Mqtt 连接
     *
     * @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
     * @param userName  用户名
     * @param password  密码
     * @param clientId  客户端Id
     */
    public void creatConnect(String brokerUrl, String userName,
                             String password, String clientId, String topic) {
        // 获取默认的临时文件路径
        String tmpDir = System.getProperty("java.io.tmpdir");

        /*
         * MqttDefaultFilePersistence:
         * 将数据包保存到持久化文件中,
         * 在数据发送过程中无论程序是否奔溃、 网络好坏
         * 只要发送的数据包客户端没有收到,
         * 这个数据包会一直保存在文件中,
         * 直到发送成功为止。
         */
        // Mqtt的默认文件持久化
        MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
        try {
            // 构建包含连接参数的连接选择对象
            conOpt = new MqttConnectOptions();
            // 设置Mqtt版本
            conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            // 设置清空Session,false表示服务器会保留客户端的连接记录,true表示每次以新的身份连接到服务器
            conOpt.setCleanSession(false);
            // 设置会话心跳时间,单位为秒
            // 客户端每隔10秒向服务端发送心跳包判断客户端是否在线
            conOpt.setKeepAliveInterval(10);
            // 设置账号
            if (userName != null) {
                conOpt.setUserName(userName);
            }
            // 设置密码
            if (password != null) {
                conOpt.setPassword(password.toCharArray());
            }
            // 最后的遗言(连接断开时, 发动"close"给订阅了topic该主题的用户告知连接已中断)
            conOpt.setWill(topic, "close".getBytes(), 2, true);
            // 客户端是否自动尝试重新连接到服务器 
            conOpt.setAutomaticReconnect(true);
            // 创建MQTT客户端
            client = new MqttClient(brokerUrl, clientId, dataStore);
            // 设置回调
            client.setCallback(mCallback);
            // 连接
            doConnect();
     
        } catch (MqttException e) {
            Log.e("MqttManager", "creatConnect : " + e.toString());

        }

    }


    /**
     * 建立连接
     */
    public void doConnect() {
        if (client != null) {
            try {
                client.connect(conOpt);
            } catch (Exception e) {
                Log.e("MqttManager", "doConnect : " + e.toString());
            }
        }
    }


    /**
     * 发布消息
     *
     * @param topicName 主题名称
     * @param qos       质量(0,1,2)
     * @param payload   发送的内容
     */
    public void publish(String topicName, int qos, byte[] payload) {
        if (client != null && client.isConnected()) {
            // 创建和配置一个消息
            MqttMessage message = new MqttMessage(payload);
            message.setPayload(payload);
            message.setQos(qos);
            try {
                client.publish(topicName, message);
            } catch (MqttException e) {
                Log.e("MqttManager", "publish : " + e.toString());
            }
        }
    }


     public void publish(String topicName, int qos, String payload) {
        if (client != null && client.isConnected()) {
            // 创建和配置一个消息
            MqttMessage message = new MqttMessage(payload.getBytes);
            message.setPayload(payload.getBytes);
            message.setQos(qos);
            try {
                client.publish(topicName, message);
            } catch (MqttException e) {
                Log.e("MqttManager", "publish : " + e.toString());
            }
        }
    }


    /**
     * 订阅主题
     *
     * @param topicName 主题名称
     * @param qos      消息质量
     */
    public void subscribe(String topicName, int qos) {
        if (client != null && client.isConnected()) {
            try {
                client.subscribe(topicName, qos);
            } catch (MqttException e) {
                Log.e("MqttManager", "subscribe : " + e.toString());
            }
        }
    }


    /**
     * 取消连接
     */
    public static void disConnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
        }
    }


    /**
     * 判断是否连接
     */
    public static boolean isConnected() {
        return client != null && client.isConnected();
    }
}

4.2新建一个MqttCallbackBus类用于MQTT异步回调
我是使用自己封装的EventBus发送粘性事件的方式来处理回调的内容
对EventBus不熟悉可以使用Handler来处理

import com.zhanyun.key.eventbus.EventModel;
import com.zhanyun.key.eventbus.EventBusUtil;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author Ai(陈祥林)
 * @date 2018/1/3  10:45
 * @email Webb@starcc.cc
 */
public class MqttCallbackBus implements MqttCallback {

    /**
     * 连接中断
     */
    @Override
    public void connectionLost(Throwable cause) {
        Log.e("MqttManager", "cause : " + cause.toString());
        // 可在此方法内写重连的逻辑 
    }


    /**
     * 消息送达
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        Log.e("MqttManager", "topic : " + topic + "\t MqttMessage : " + message.toString());
        EventBusUtil.sendStickyEvent(new EventModel(10001, topic));
        EventBusUtil.sendStickyEvent(new EventModel(10010, message));
    }


    /**
     * 交互完成
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.e("MqttManager", "token : " + token.toString());
    }
}



5.使用
在Activity中调用

/**
* 第一个参数是服务器地址
* 第二个参数是用户名
* 第三个参数是密码
* 第四个参数是客户端ID
* 第五个参数是主题
**/
MqttManager
        .getInstance()
        .creatConnect(
              brokerUrl,
              userName,
              password,
              clientId, 
              topic);


运行后打印服务端返回的数据



6.总结
MQTT是基于TCP/IP的, 手机锁屏时会阻塞TCP, 导致MQTT中断, MqttConnectOptions设置isAutomaticReconnect()为true时可自动重连, 但多个相同的ClientID同时创建连接时会无限的连接中断和自动连接(注意)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,483评论 25 707
  • 每年的这个时候,伴着北方寒冷的天气,心情也开始变得复杂了。不是因为天气的原因,而是因为年终将至,新年将来,不由自主...
    生财有术升级认知阅读 210评论 0 0
  • 01 “戚本”是《红楼梦》众多版本中的一种,前面因有戚蓼生的序而得名,通常意义上又专指有正书局石印的大字本和小字本...
    雨如酥阅读 1,575评论 0 3