<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:background="@color/colorPrimary"
android:layout_height="match_parent">
<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="接受:"
android:textColor="#fff"
android:id="@+id/tvrece"
android:textSize="30sp"/>
<TextView
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:textColor="#fff"
android:textSize="30sp"
android:id="@+id/tv"
android:layout_toRightOf="@id/tvrece"
/>
<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="发送:"
android:id="@+id/tvsend"
android:textColor="#fff"
android:layout_below="@id/tvrece"
android:textSize="30sp"/>
<EditText
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:textColor="#fff"
android:textSize="30sp"
android:id="@+id/tv1"
android:layout_toRightOf="@id/tvsend"
android:layout_below="@id/tv"
/>
<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_below="@id/tv1"
android:id="@+id/btnsend"
android:text="发送"/>
</RelativeLayout>
package com.example.app1;
import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import android.widget.Toast;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MainActivity_my extends Activity {
MqttClient client;
MqttConnectOptions options;
ScheduledExecutorService scheduler;
Handler handler;
String myTopic="a/b/c";
String myTopic1="a/b";
TextView tv,tv1;
Button btnsend;
String ms;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_my);
tv=(TextView)findViewById(R.id.tv);
tv1=(TextView)findViewById(R.id.tv1);
btnsend=(Button)findViewById(R.id.btnsend);
btnsend.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
MqttMessage message=new MqttMessage();
ms=tv1.getText().toString();
message.setPayload(ms.getBytes());
try {
client.publish(myTopic1,message);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
initMQTT();
handler=new Handler(){
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
if (msg.what==1){
tv.setText((String)msg.obj);
Toast.makeText(MainActivity_my.this,(String)msg.obj,Toast.LENGTH_SHORT).show();
}else if(msg.what==2){
Toast.makeText(MainActivity_my.this, "连接成功", Toast.LENGTH_SHORT).show();
try {
client.subscribe(myTopic, 1);
} catch (Exception e) {
e.printStackTrace();
}
}else if(msg.what==3){
Toast.makeText(MainActivity_my.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();
}
}
};
startReconnect();
}
private void startReconnect() {
scheduler= Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(!client.isConnected()){
connect();
}
}
},0*1000,10*1000, TimeUnit.MILLISECONDS);
}
private void initMQTT() {
try {
client = new MqttClient("tcp://192.168.221.60:1883", "T001",new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName("admin");
// 设置连接的密码
options.setPassword("password".toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
// TODO Auto-generated method stub
//subscribe后得到的消息会执行到这里面
System.out.println("messageArrived----------");
System.out.println("========================"+message.toString());
Message msg = new Message();
//Toast.makeText(MainActivity.this,message.toString(),Toast.LENGTH_SHORT);
msg.what = 1;
// msg.obj = topicName+"---"+message.toString();
msg.obj=message.toString();
handler.sendMessage(msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
//publish后会执行到这里
System.out.println("deliveryComplete---------" + token.isComplete());
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
//连接丢失后,一般在这里面进行重连
System.out.println("connectionLost----------");
}
});
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 连接mqtt
private void connect() {
new Thread(new Runnable() {
@Override
public void run() {
try {
client.connect(options);
Message msg = new Message();
msg.what = 2;
handler.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
Message msg = new Message();
msg.what = 3;
handler.sendMessage(msg);
}
}
}).start();
}
}