针对 MQTT 的常用平台,推荐对应的第三方包如表 1 所示:
https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.559.3a7d3d26JKCSDc
Android Eclipse Paho SDK https://github.com/eclipse/paho.mqtt.android
如下图:
使用MQTT使用到的几点
1、topic管理(也就是消息主题)
2、生产者管理(消息的发送者,也就是服务端发送的数据)
3、消费者管理(消息的接收者,也就是移动端接收的数据)
4、实例管理
5、Group Id管理
附上图片:
以上这些一般都是后台人员进行创建,但是在测试阶段,移动端开发人员也会自己去创建一个测试的实例进行测试使用,所以此部分也是需要了解的,具体创建参考官方文档即可。
自行写DEMO工程:
根据官方教程:加上以下依赖:
repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
dependencies {
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
添加权限,注册service
<service android:name=".mqtt.service.MqttService" />
然后自行写一个后台服务去进行MQTT服务连接。
在github上下载下来的项目包含三个文件,如图所示:
1、paho.mqtt.android.example 这个是我们需要使用到的接收消息的demo
2、org.eclipse.paho.android.service 这个是我们接收使用到的service的 library
3、org.eclipse.paho.android.sample 这个是模拟发送的demo(这里我没有使用,而是使用后台的发送代码或者官网上生产者那里的发送进行发送消息,就不做说明了)
具体代码可参考:paho.mqtt.android.example
下面对修改后的参数进行说明一下:
1、serverUri 服务器的域名加端口 例如:tcp://xxx:1883 因为不能泄露一些数据,所以使用xxx代替了,具体的地址则是在MQTT管理----》实例管理下的接入点域名,复制的接入点域名即是xxx了,注意,tcp还是需要的哦,接入点域名只是xxx的那一部分。
2、groupId groupId可在 MQTT管理----》Group Id管理中获取到,这个id是需要自己创建的。
3、topic 主题(注意:主题是需要一致的)
4、clientId 客户端id,clientId 是由groupId +@@@+设备名。设备名可以随便取,但是客户端id有长度限制,具体长度限制可查看官方文档。
5、accessKey 与secretKey 这两也是需要自己创建,在创建完成后需要保存好。
以上代码能进行接收的主要区别是以下这段代码:
String sign = null;
try {
sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
mqttConnectOptions.setUserName(accessKey);
mqttConnectOptions.setPassword(sign.toCharArray());
mqttConnectOptions.setCleanSession(cleanSession);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions默认构造器属性,userName和password找后台要。
代码掠影:
Service里代码:
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
init();
return super.onStartCommand(intent, flags, startId);
}
/**
* MQTT服务连接
*/
public void connectToNet(){
MqttConnectOptions mqttConnectOptions =new MqttConnectOptions();
String sign =null;
try {
sign =macSignature(clientId, secretKey);
}catch (InvalidKeyException e) {
e.printStackTrace();
}catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
mqttConnectOptions.setUserName("Signature|" +accessKey +"|" +instanceId);
mqttConnectOptions.setPassword(sign.toCharArray());
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
try {
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
DisconnectedBufferOptions disconnectedBufferOptions =new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
subscribeToTopic();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e("yyyy","onFailure:"+exception.getMessage());
}
});
}catch (MqttException ex){
ex.printStackTrace();
}
}
/**
* @param text 要签名的文本
* @param secretKey 阿里云MQ secretKey
* @return 加密后的字符串
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public static StringmacSignature(String text, String secretKey)throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm ="HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
public void subscribeToTopic(){
try {
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.e("yyyy","subscriptionTopic is onsuccess");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e("yyyy","subscriptionTopic is onFailure");
}
});
// THIS DOES NOT WORK!
mqttAndroidClient.subscribe(subscriptionTopic, 0, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message)throws Exception {
// message Arrived!
Log.e("yyyy","subscriptionTopic message Arrived"+ message.toString());
}
});
}catch (MqttException ex){
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
@Override
public void onDestroy() {
try {
mqttAndroidClient.disconnect(); //断开连接
}catch (MqttException e) {
e.printStackTrace();
}
super.onDestroy();
}
参数配置:
private StringserverUri ="tcp://*************************";
private StringclientId ="GID_DEVICE_PUSH@@@device_sn_1";
StringaccessKey ="**************";
StringsecretKey ="**********************";
StringinstanceId ="mqtt-cn-v0h14cnqw02";
final StringsubTopic ="/mq4Iot";
final StringsubscriptionTopic ="DEVICE_PUSH_MQ" +subTopic;
private static MqttAndroidClientmqttAndroidClient;
以上,即可,更多度娘!!!!!