创建连接
MainActivity
首先我们得需要mqtt3.jar包
mqtt3下载连接
package com.servermqtt;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.EditText;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MainActivity extends AppCompatActivity {
private ServerMQTT mServerMQTT;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
final EditText msg= (EditText) findViewById(R.id.msg);
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
mServerMQTT.sendMessage(msg.getText().toString().trim()+"");
}
});
findViewById(R.id.create). setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
try {
mServerMQTT=new ServerMQTT();
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
}
ServerMQTT
package com.servermqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
*
* Title:Server
* Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
* @author admin
*/
public class ServerMQTT {
//tcp://MQTT安装的服务器地址:MQTT定义的端口号
public static final String HOST = "tcp://192.168.10.67:61613";
//定义一个主题
public static final String TOPIC = "topic11";
//定义MQTT的ID,可以在MQTT服务配置中指定
private static final String clientid = "client1";
private MqttClient client;
private MqttTopic topic11;
private String userName = "admin";
private String passWord = "password";
private MqttMessage message;
private ServerMQTT mServer;
/**
* 构造函数
* @throws MqttException
*/
public ServerMQTT() throws MqttException {
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
connect();
}
/**
* 用来连接服务器
*/
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
client.setCallback(new PushCallback());
client.connect(options);
topic11 = client.getTopic(TOPIC);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete());
}
public void sendMessage(String msg){
try {
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload(msg.getBytes());
publish(topic11 , message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
PushCallback
package com.servermqtt; /**
*
* Description:
* @author admin
* 2017年2月10日下午18:04:07
*/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
* 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* public void connectionLost(Throwable cause)在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
* 由 MqttClient.connect 激活此回调。
*
*/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
}
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
>
<Button
android:text="创建"
android:id="@+id/create"
android:onClick="onClick"
android:layout_width="match_parent"
android:layout_height="50dp"/>
<Button
android:text="发送消息"
android:id="@+id/send"
android:onClick="onClick"
android:layout_width="match_parent"
android:layout_height="50dp"/>
<EditText
android:id="@+id/msg"
android:layout_width="match_parent"
android:layout_height="50dp"/>
</LinearLayout>