个人博客:haichenyi.com。感谢关注
整体介绍
最近公司用到的推送MQTT。不想过多的介绍背景什么的,我就直接讲怎么实现这个功能。
他这个原理长连接,这个不用多讲,用法类似于EventBus,需要先订阅,然后通过topic再发送消息。topic是什么呢?我先来讲讲整体流程:
先连接服务器,要先建立长连接
然后需要订阅topic,连接之后才能订阅topic
最后就是通过topic推送消息,接收消息
一步一步讲:
第一步,与服务器建立连接
先丢代码,然后看注释:
private void initPush() {
// 服务器地址(协议+地址+端口号)
String uri = host;
client = new MqttAndroidClient(this, uri, clientId);
// 设置MQTT监听并且接受消息
client.setCallback(mqttCallback);
//Mqtt的一些设置
conOpt = new MqttConnectOptions();
conOpt.setAutomaticReconnect(true);
// 清除缓存
conOpt.setCleanSession(true);
// 设置超时时间,单位:秒
conOpt.setConnectionTimeout(10);
// 心跳包发送间隔,单位:秒
conOpt.setKeepAliveInterval(20);
myTopic = String.format(TOPIC_SUB, mDeviceId);
Log.e(TAG,"myTopic_________"+myTopic);
doClientConnection();
}
上面的这些参数,我碰到了两个问题。
上面的这些参数,我碰到了两个问题。
上面的这些参数,我碰到了两个问题。
- 第一个问题,与服务器建立连接,你得先有一个服务器吧?我根据网上的步骤,创建了一个apache-apollo服务器,并且启动了,也启动成功了,我建立连接的时候,总是失败。然后,找啊找,找啊找。问题没有解决,但是,我找到了一个可以用的服务器,也就是这里的uri,不要设置MqttConnectOptions的用户名和密码,设置了他会拒绝
private String host = "tcp://test.mosquitto.org:1883";
- 第二个问题,我连接成功之后,不一会,他就会自动断开连接,或者,推送完消息之后,他就会断开连接。然后,网上搜原因,找啊找,诶,我找到了。MqttAndroidClient的构造方法:
/**
* Constructor - create an MqttAndroidClient that can be used to communicate with an MQTT server on android
*
* @param context
* object used to pass context to the callback.
* @param serverURI
* specifies the protocol, host name and port to be used to
* connect to an MQTT server
* @param clientId
* specifies the name by which this connection should be
* identified to the server
*/
public MqttAndroidClient(Context context, String serverURI,
String clientId) {
this(context, serverURI, clientId, null, Ack.AUTO_ACK);
}
看第三个参数,clientId,指定一个名字,用来连接服务器的身份标识。就是说,你设置的这个值,是你在服务器的唯一标识,不能跟其他用户的相同。我把这个clientId直接用uuid生成,就没问题了。
第二步,订阅topic
回到上面,接着往下面走,
/**
* 连接MQTT服务器
*/
private void doClientConnection() {
if (!client.isConnected() && isConnectIsNormal()) {
try {
client.connect(conOpt, null, iMqttActionListener);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 判断网络是否连接
*/
private boolean isConnectIsNormal() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
.getSystemService(Context.CONNECTIVITY_SERVICE);
if (connectivityManager != null) {
NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isAvailable()) {
String name = info.getTypeName();
Log.e(TAG, "MQTT当前网络名称:" + name);
return true;
} else {
Log.e(TAG, "MQTT 没有可用网络");
return false;
}
} else {
return false;
}
}
这个方法就是用来连接服务器的,首先判断是否正在连接,后面那个是判断当前有没有网络。再就是这个iMqttActionListener监听了
// MQTT是否连接成功
private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken arg0) {
Log.e(TAG, "连接成功 ");
try {
// 订阅myTopic话题
client.subscribe(myTopic, 0);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken arg0, Throwable arg1) {
Log.e(TAG, "连接失败");
arg1.printStackTrace();
// 连接失败,重连
}
};
讷,就是这里,你如果服务器有问题,他一直走onFailure方法。服务器连接成功之后,就是订阅topic。我来说说这个
client.subscribe(myTopic, 0);
首先,这个主题,是你自己跟服务器商量好的,随便什么都可以。为什么要订阅主题呢?我提前给你瞅瞅推送消息是怎么推送的
第二个参数,消息的类型qos,有三种:0、1、2
- 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
- 1代表“至少一次”,确保消息到达,但消息重复可能会发生
- 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
简单说明下,如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了。如果需要客户端保证能接收消息,需要指定QoS为1
client.publish(topic, new MqttMessage(msg.getBytes()));
讷,推送消息,是根据topic推送的,第二个参数,就是你要推送的具体消息。我个人认为,你可以理解成就类似于键值对的形式,
不同的用户可以订阅相同的主题
不同的用户可以订阅相同的主题
不同的用户可以订阅相同的主题
这个就是跟其他长连接不同的地方,底层,其实都一样,虽然我没有看底层的代码。想也想的到,服务器肯定是根据这个主题,去找对应的用户,然后推送消息。而其他的长连接就是直接指定用户。跑题了,跑题了。
第三步,推送、接收消息
当你连接服务器成功之后,就要推送消息了,我用的EventBus发的
private void publishData(String msg) {
String topic = myTopic;
try {
Log.e(TAG,"给__"+topic+"__topic发送的消息为:"+msg);
client.publish(topic, new MqttMessage(msg.getBytes()));
} catch (MqttException e) {
e.printStackTrace();
}
}
// MQTT监听并且接受消息
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.e(TAG,"接受到__"+topic+"__topic的消息为:"+new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
Log.e(TAG,"deliveryComplete");
}
@Override
public void connectionLost(Throwable arg0) {
// 失去连接,重连
Log.e(TAG,"失去连接");
}
};
当你的clientId重复的时候,他就会一直走connectionLost方法。到这里,基本上就讲完了,要注意的是,退出的时候,记得要释放资源
@Override
public void onDestroy() {
try {
if (client != null && client.isConnected()) {
client.disconnect();
}
} catch (MqttException e) {
e.printStackTrace();
}
EventBus.getDefault().unregister(this);
super.onDestroy();
}
网上很多都是直接讲整体流程,重来不讲中间碰到的问题。难受