mqtt 整合springboot具体步骤代码
-
1. pom 引入相关包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.14</version> </dependency> 2. application.yml相关配置
server:
port: 8080
mqtt:
config:
username: admin
password: public
hostUrl: tcp://localhost:1883
clientId: ${random.value}
pubTopic: pubTopic
subTopic: subTopic
- 3. MqttConfig配置类
package com.example.tdengine;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* mqtt配置类
*
* @author WeiWei
* @date 2022/08/08
*/
@Data
@ConfigurationProperties(prefix = "mqtt.config")
@Component
public class MqttConfig {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 主机url(代理端即 broker)
*/
private String hostUrl;
/**
* 客户机id
*/
private String clientId;
/**
* 发布的主题
*/
private String pubTopic;
/**
* 订阅的主题
*/
private String subTopic;
}
- 4. MqttServer类
package com.example.tdengine;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Objects;
/**
* mqtt服务类
*
* @author WeiWei
* @date 2022/08/08
*/
@Slf4j
@Configuration
public class MqttServer{
/**
* 出站通道
*/
public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
/**
* 输入通道
*/
public static final String INPUT_CHANNEL = "mqttInputChannel";
/**
* mqtt配置
*/
@Resource
private MqttConfig mqttConfig;
/**
* 初始化
*/
@PostConstruct
public void init() {
log.info(mqttConfig.toString());
}
/**
* mqtt客户工厂
*
* @return {@link MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory clientFactory() {
MqttConnectOptions options = new MqttConnectOptions();
//配置MqttConnectOptions
options.setServerURIs(new String[]{mqttConfig.getHostUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
}
/**
* mqtt出站通道
*
* @return {@link MessageChannel}
*/
@Bean(value = OUTBOUND_CHANNEL)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* mqtt出站handler
*
* @return {@link MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler mqttOutboundHandler() {
//MqttPahoMessageHandler初始化
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttConfig.getClientId(), clientFactory());
//设置默认的qos级别
handler.setDefaultQos(1);
//保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter。这里不启用
handler.setDefaultRetained(false);
//设置发布的主题
handler.setDefaultTopic(mqttConfig.getPubTopic());
//当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。
handler.setAsync(false);
//当 async 和 async-events 都为 true 时,会发出 MqttMessageSentEvent(请参阅事件)。它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。
handler.setAsyncEvents(false);
return handler;
}
/**
* mqtt输入通道
*
* @return {@link MessageChannel}
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 入站
*
* @return {@link MessageProducer}
*/
@Bean
public MessageProducer inbound() {
//配置订阅端MqttPahoMessageDrivenChannelAdapter
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
mqttConfig.getClientId() + "_inbound", clientFactory(), mqttConfig.getSubTopic().split(","));
//设置超时时间
adapter.setCompletionTimeout(3000);
//设置默认的消息转换类
adapter.setConverter(new DefaultPahoMessageConverter());
//设置qos级别
adapter.setQos(1);
//设置入站管道
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 消息处理程序
*
* @return {@link MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler messageHandler() {
return message -> {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
log.info("订阅主题为: {}", topic);
String[] topics = mqttConfig.getSubTopic().split(",");
for (String t : topics) {
if (t.equals(topic)) {
log.info("接收到该主题消息为: {}", message.getPayload().toString());
}
}
};
}
}
- 5. MqttGateway接口
package com.example.tdengine;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* mqtt网关(发布端需要用到)
*
* @author WeiWei
* @date 2022/08/08
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttServer.OUTBOUND_CHANNEL)
public interface MqttGateway {
/**
* 发送到mqtt
*
* @param payload 有效载荷
*/
void sendToMqtt(String payload);
/**
* 发送到mqtt
*
* @param topic 主题
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 发送到mqtt
*
* @param topic 主题
* @param qos qos
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}