一、MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和制动器(比如通过Twitter让房屋联网)的通信协议。
主要特征:
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2.对负载内容屏蔽的消息传输;
3.使用TCP/IP 提供网络连接;
4.有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5.小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
二、Maven依赖
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
三、apollo服务器
- 配置文件(application.properties)
#MQTT配置信息
spring.mqtt.username=admin
spring.mqtt.password=password
## MQTT-服务器连接地址,如果有多个,用逗号隔开,
# 如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:61613
## MQTT-连接服务器默认客户端ID
spring.mqtt.client.id=clientId
## MQTT-连接服务器默认服务端ID
spring.mqtt.server.id=serverId
## MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=topic
- 消息发送配置类
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 连接设置
* @return
*/
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
/**
* 客户端工厂
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* 发布通知
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
/**
* 发布通道为直连
* @return
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
- 消息发送接口
/**
* 消息发送接口
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {
void sendToMqtt(String data);
void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
消息发送接口,需要发送消息的时候直接调用就行了,提供了几个重载方法payload或者data是发送消息的内容
topic是消息发送的主题,这里可以自己灵活定义,也可以使用默认的主题,就是配置文件的主题,qos是mqtt 对消息处理的几种机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
当然,这三种模式下的性能肯定也不一样,qos=0是最好的,2是最差的 。
- 测试
@RestController
public class MqttController {
@Autowired
private MsgWriter msgWriter;
@RequestMapping("/send")
public String sendMqtt(String sendData){
msgWriter.sendToMqtt(sendData,"hello");
return "OK";
}
}
- 消息消费(本测试用的是同一项目,建议创建单独消费项目进行测试)
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttConsumersConfig {
// 订阅的主题可以指定,我订阅的是刚才发的too主题,还有订阅方的id 别和发送方的id 一样
@Value("${spring.mqtt.server.id}")
private String serverId;
/**
* 使用MqttSenderConfig中生成的工厂对象。
* 如果单独服务器,请使用以下@Bean代码。
*/
@Autowired
private MqttPahoClientFactory mqttClientFactory;
// @Bean
// public MqttPahoClientFactory mqttClientFactory() {
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// factory.setServerURIs("tcp://localhost:61613");
// factory.setUserName("admin");
// factory.setPassword("password");
// return factory;
// }
/**
* consumer 订阅者监听消息
* @return
*/
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", 收到消息,来自MQTT")
.handle(logger())
.get();
}
/**
* 处理日志
* @return
*/
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
/**
* 订阅主题
* @return
*/
@Bean
public MessageProducerSupport mqttInbound() {
/**
* 订阅的主题可以指定,我订阅的是刚才发的too主题,还有订阅方的id 别和发送方的id 一样
*/
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,
mqttClientFactory, "hello");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
}
注意:
主题名称生产者与消费者一定要对应,否则取不到消息 。
四、EMQ服务器
- 配置
# MQTT-密码
# MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:1883,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:1883
# MQTT-连接服务器默认客户端ID
spring.mqtt.client.id=mqttId
# MQTT-连接服务器默认服务端ID
spring.mqtt.server.id=serverId
# MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=topic
spring.mqtt.username=admin
spring.mqtt.password=public
- 代码
与apollo服务器相同。 -
测试结果
测试方法同apollo服务器。