1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
2.程序入口
@SpringBootApplication
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class, args);
}
}
3.配置类
#application.yaml
spring:
mqtt:
username: admin
password: Pub2020lic
url: tcp://111.23.140.59:10029
qos: 2
@Slf4j
@Data
@ConfigurationProperties(prefix = "spring.mqtt", ignoreInvalidFields = true)
@Configuration
public class MqttConfig {
private String username;
private String password;
private String url;
private Integer qos;
/**
* 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
* 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息,
* 如果短线需要删除之前的消息则可以设置为true
*
* @return
*/
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setServerURIs(new String[]{url});
options.setConnectionTimeout(10);
options.setAutomaticReconnect(true);
//设置心跳
options.setKeepAliveInterval(20);
return options;
}
@Bean
public SimpleMqttClient simpleMqttClient() {
SimpleMqttClient simpleMqttClient = new SimpleMqttClient();
//连接emq
simpleMqttClient.connect(url, "mqttclient"+System.currentTimeMillis(), username, password);
log.info("create MqttPushClient success!");
return simpleMqttClient;
}
}
3.Mqtt客户端
@Slf4j
public class SimpleMqttClient {
@Autowired
private IMqttService mqttService;
//全局唯一 单例
private static IMqttAsyncClient client;
private static IMqttAsyncClient getClient() {
return client;
}
private static void setClient(IMqttAsyncClient client) {
SimpleMqttClient.client = client;
}
/**
* 连接MQTT服务器
*/
public void connect(String serverURI, String clientID, String username, String password) {
IMqttAsyncClient client = null;
try {
client = new MqttAsyncClient(serverURI, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setServerURIs(new String[]{serverURI});
options.setConnectionTimeout(100);
options.setAutomaticReconnect(true);
//设置心跳
options.setKeepAliveInterval(30);
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean b, String s) {
log.info("重连成功!");
subscribe(MqttConfig.TOPIC_CAR_UP, 0);
//publish("BUS_TS_REPLY_TOPIC_湘AB7182", "7E81074000010000000060013608025900000F7E", 0, false);
}
@Override
public void connectionLost(Throwable throwable) {
log.error("Lost connection!!! {}");
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("接收消息主题 : " + topic);
log.info("接收消息Qos : " + mqttMessage.getQos());
log.info("接收消息内容 : " + BaseConvert.bytesToHexString(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.debug("send success ? --> {}, {}", iMqttDeliveryToken.isComplete(), count++);
}
});
try {
client.connect(options);
SimpleMqttClient.setClient(client);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(String topic, byte[] message, int qos, boolean retained) {
if(client != null && client.isConnected()) {
try {
IMqttDeliveryToken token = client.publish(topic, message, qos, retained);
token.waitForCompletion();
log.debug("Is the message sent successfully? --> {}, {}", token.isComplete());
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
log.info("开始订阅主题: {}" , topic);
if (client != null && client.isConnected()) {
try {
IMqttToken token = client.subscribe(topic, qos);
//token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
log.error("subscribe topic {} qos {} error!", topic, qos);
}
}
}
/**
* 订阅多主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String[] topics, int[] qos) {
log.info("开始订阅主题集合:{}", Arrays.asList(topics));
if (client != null && client.isConnected()) {
try {
IMqttToken token = client.subscribe(topics, qos);
//token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
log.error("subscribe topic {} qos {} error!", topics, qos);
}
}
}
}