Eclipse Paho:是Eclipse提供的一个访问MQTT服务器的一种开源客户端库。
Eclipse目前提供十种不同语言平台的客户端类库,
对于Java平台而言和MQTT服务器交互的开源框架还有很多, 例如:
Eclipse Paho Java、 Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等等...
但是, 根据GIthub上使用次数来讲Eclipse Paho无疑是主流, 就个人使用而已, Eclipse Paho集成非常方便、简单。
对MQTT协议不是很了解的可以看一下
Eclipse Paho 官网
Paho.client.mqttv3 在线API
注意:
QoS:服务质量,其作用是当网络过载或拥塞时,QoS 能确保重要业务量不受延迟,同时保证网络的高效运行
- 服务质量0 - 表示一条消息最多应该发送一次(零次或一次)。该消息不会持久保存到磁盘,并且不会通过网络进行确认。这种QoS是最快的,但只能用于无价值的消息
- 服务质量1 - 表示一条消息应至少传递一次(一次或多次)。该消息只能在持久存在的情况下才能被安全地传递,因此应用程序必须提供一种持久化方法。如果未指定持久性机制,则在发生客户端故障时不会传递消息。该消息将通过网络得到确认。这是默认的QoS。
- 服务质量2 - 表示应该传递一次消息。该消息将被保存到磁盘,并且将受到整个网络的两阶段确认。该消息只能在持久存在的情况下才能被安全地传递,因此应用程序必须提供一种持久化方法。如果未指定持久性机制,则在发生客户端故障时不会传递消息。
如果未配置持久性,则在发生网络或服务器问题时,仍会传送QoS 1和2消息,因为客户端将在内存中保持状态。如果MQTT客户端关闭或失败,并且未配置持久性,则由于客户端状态将丢失,因此无法保持QoS 1和2消息的传输。
publish: 向服务器上的主题发布消息, 主要用于即时通讯(IoT、 聊天...)
subscribe: 订阅主题, 服务器通过你订阅的主题向你发布消息, 这样就能实现服务器向APP推送信息了
ClientId: 客户端ID应该是唯一的, 多个用户同时使用同一个ID创建连接会不断挤线互踢导致连接不断中断, 服务器也是通过ID来区分不同用户的操作
开始使用Eclipse Paho
1. 配置maven库
repositories {
...
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
...
}
2.添加依赖
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0' // MQTT Eclipse paho
3.添加权限
<uses-permission android:name="android.permission.WAKE_LOCK"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.INTERNET"/>
4.编写逻辑代码
4.1新建一个MqttManager类, 用于MQTT操作的管理
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
/**
* @author Ai(陈祥林)
* @date 2018/1/3 10:37
* @email Webb@starcc.cc
*/
public class MqttManager {
private static MqttManager mInstance = null;
/**
* Mqtt回调
*/
private MqttCallback mCallback;
/**
* Mqtt客户端
*/
private static MqttClient client;
/**
* Mqtt连接选项
*/
private MqttConnectOptions conOpt;
private MqttManager() {
mCallback = new MqttCallbackBus();
}
public static MqttManager getInstance() {
if (null == mInstance) {
synchronized (MqttManager.class) {
if (mInstance == null) {
mInstance = new MqttManager();
}
}
}
return mInstance;
}
/**
* 释放单例, 及其所引用的资源
*/
public static void release() {
try {
if (mInstance != null) {
disConnect();
mInstance = null;
}
} catch (Exception e) {
Log.e("MqttManager", "release : " + e.toString());
}
}
/**
* 创建Mqtt 连接
*
* @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
* @param userName 用户名
* @param password 密码
* @param clientId 客户端Id
*/
public void creatConnect(String brokerUrl, String userName,
String password, String clientId, String topic) {
// 获取默认的临时文件路径
String tmpDir = System.getProperty("java.io.tmpdir");
/*
* MqttDefaultFilePersistence:
* 将数据包保存到持久化文件中,
* 在数据发送过程中无论程序是否奔溃、 网络好坏
* 只要发送的数据包客户端没有收到,
* 这个数据包会一直保存在文件中,
* 直到发送成功为止。
*/
// Mqtt的默认文件持久化
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
// 构建包含连接参数的连接选择对象
conOpt = new MqttConnectOptions();
// 设置Mqtt版本
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
// 设置清空Session,false表示服务器会保留客户端的连接记录,true表示每次以新的身份连接到服务器
conOpt.setCleanSession(false);
// 设置会话心跳时间,单位为秒
// 客户端每隔10秒向服务端发送心跳包判断客户端是否在线
conOpt.setKeepAliveInterval(10);
// 设置账号
if (userName != null) {
conOpt.setUserName(userName);
}
// 设置密码
if (password != null) {
conOpt.setPassword(password.toCharArray());
}
// 最后的遗言(连接断开时, 发动"close"给订阅了topic该主题的用户告知连接已中断)
conOpt.setWill(topic, "close".getBytes(), 2, true);
// 客户端是否自动尝试重新连接到服务器
conOpt.setAutomaticReconnect(true);
// 创建MQTT客户端
client = new MqttClient(brokerUrl, clientId, dataStore);
// 设置回调
client.setCallback(mCallback);
// 连接
doConnect();
} catch (MqttException e) {
Log.e("MqttManager", "creatConnect : " + e.toString());
}
}
/**
* 建立连接
*/
public void doConnect() {
if (client != null) {
try {
client.connect(conOpt);
} catch (Exception e) {
Log.e("MqttManager", "doConnect : " + e.toString());
}
}
}
/**
* 发布消息
*
* @param topicName 主题名称
* @param qos 质量(0,1,2)
* @param payload 发送的内容
*/
public void publish(String topicName, int qos, byte[] payload) {
if (client != null && client.isConnected()) {
// 创建和配置一个消息
MqttMessage message = new MqttMessage(payload);
message.setPayload(payload);
message.setQos(qos);
try {
client.publish(topicName, message);
} catch (MqttException e) {
Log.e("MqttManager", "publish : " + e.toString());
}
}
}
public void publish(String topicName, int qos, String payload) {
if (client != null && client.isConnected()) {
// 创建和配置一个消息
MqttMessage message = new MqttMessage(payload.getBytes);
message.setPayload(payload.getBytes);
message.setQos(qos);
try {
client.publish(topicName, message);
} catch (MqttException e) {
Log.e("MqttManager", "publish : " + e.toString());
}
}
}
/**
* 订阅主题
*
* @param topicName 主题名称
* @param qos 消息质量
*/
public void subscribe(String topicName, int qos) {
if (client != null && client.isConnected()) {
try {
client.subscribe(topicName, qos);
} catch (MqttException e) {
Log.e("MqttManager", "subscribe : " + e.toString());
}
}
}
/**
* 取消连接
*/
public static void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
}
}
/**
* 判断是否连接
*/
public static boolean isConnected() {
return client != null && client.isConnected();
}
}
4.2新建一个MqttCallbackBus类用于MQTT异步回调
我是使用自己封装的EventBus发送粘性事件的方式来处理回调的内容
对EventBus不熟悉可以使用Handler来处理
import com.zhanyun.key.eventbus.EventModel;
import com.zhanyun.key.eventbus.EventBusUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @author Ai(陈祥林)
* @date 2018/1/3 10:45
* @email Webb@starcc.cc
*/
public class MqttCallbackBus implements MqttCallback {
/**
* 连接中断
*/
@Override
public void connectionLost(Throwable cause) {
Log.e("MqttManager", "cause : " + cause.toString());
// 可在此方法内写重连的逻辑
}
/**
* 消息送达
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.e("MqttManager", "topic : " + topic + "\t MqttMessage : " + message.toString());
EventBusUtil.sendStickyEvent(new EventModel(10001, topic));
EventBusUtil.sendStickyEvent(new EventModel(10010, message));
}
/**
* 交互完成
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.e("MqttManager", "token : " + token.toString());
}
}
5.使用
在Activity中调用
/**
* 第一个参数是服务器地址
* 第二个参数是用户名
* 第三个参数是密码
* 第四个参数是客户端ID
* 第五个参数是主题
**/
MqttManager
.getInstance()
.creatConnect(
brokerUrl,
userName,
password,
clientId,
topic);
6.总结
MQTT是基于TCP/IP的, 手机锁屏时会阻塞TCP, 导致MQTT中断, MqttConnectOptions设置isAutomaticReconnect()为true时可自动重连, 但多个相同的ClientID同时创建连接时会无限的连接中断和自动连接(注意)