1 MQTT
1.1 MQTT介绍
1.1.1 简介
MQTT
全称(Message Queue Telemetry Transport
):一种基于发布/订阅(publish/subscribe
)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing
)中的一个标准传输协议。
MQTT
是一种基于发布/订阅
模式的轻量级通讯协议,该协议构建在TCP/IP
协议上。 MQTT
最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽
占用的即时通讯协议,MQTT
在物联网、小型设备、移动应用等方面有广泛应用。
MQTT
协议将消息的发布者(publisher
)与订阅者(subscriber
)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ
有点类似。
TCP
协议位于传输层,MQTT
协议位于应用层,MQTT
协议构建于TCP/IP
协议上,也就是说只要支持TCP/IP
协议栈的地方,都可以使用MQTT
协议。
1.1.2 特点和应用
特点:
- 开放消息协议,简单易实现
- 发布订阅模式,一对多消息发布
- 基于
TCP/IP
网络连接,提供有序,无损,双向连接
- 2字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量
- 消息
QoS
支持,可靠传输保证
应用:
- 物联网M2M通信,物联网大数据采集
- Android消息推送,WEB消息推送
- 智能硬件、智能家具、智能电器
- 车联网通信,电动车站桩采集
- 智慧城市、远程医疗、远程教育
- 电力、石油与能源等行业市场
1.1.3 为什么要用 MQTT协议
MQTT
协议为什么在物联网(IOT
)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP
协议呢?
首先HTTP
协议它是一种同步协议
,客户端请求后需要等待服务器的响应。而在物联网(IOT
)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信
不稳定等,显然异步消息协议更为适合IOT
应用程序。
HTTP
是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT
)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP
要实现这样的功能不但很困难,而且成本极高。
1.1.4 名词介绍
-
Publisher
(发布者):消息的发出者,负责发送消息 -
Subscriber
(订阅者):消息的订阅者,负责接收并处理消息 -
Broker
(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT
协议的消息中间件都可以充当 -
Topic
(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。 -
Payload
(负载);可以理解为发送消息的内容。 -
QoS
(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2
三个等级,下面分别介绍下:-
QoS 0
(Almost Once):至多一次,只发送一次,会发生消息丢失或重复; -
QoS 1
(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生; -
QoS 2
(Exactly Once):只有一次,确保消息只到达一次
-
1.2 MQTT控制报文的结构
MQTT
通过交换一些预定义的MQTT
控制报文来工作,每条MQTT
命令消息的消息头都包含一个固定的报头
,有些消息会携带一个可变报文头
和一个负荷
。消息格式如下:
固定包头,存在于所有MQTT控制包
可变包头,存在于某些MQTT控制包
载荷,存在于某些MQTT控制包
1.2.1 固定报文头(Fixed Header)
MQTT
固定报文头最少有两个字节
,第一个字节包含消息类型(Message Type)
和QoS级别
等标志位。第二个字节开始是剩余长度
字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节。
剩余长度
使用了一种可变长度的结构来编码,这种结构使用单一字节
表示0-127
的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。
例如十进制的数字64可以被编码成一个单独的字节,十进制为64,八进制为0x40。十进制数字321(=65+2×128)被编码为两个字节,低位在前。第一个字节是65+128 = 193。注意最高位的128表示后面至少还有一个字节。第二个字节是2,表示2*127。(注:321 = 11000001 00000010,第一个字节是“标识符后面还有一个字节”+65,第二个字节是“标识符后面没有字节了”+256)。
1.2.2 可变报文头(Variable Header)
可变报文头主要包含协议名
、协议版本
、连接标志(Connect Flags)
、心跳间隔时间(Keep Alive timer)
、连接返回码(Connect Return Code)
、主题名(Topic Name)
等
1.2.3 有效负荷和消息类型
有效负荷(Payload),可以理解为消息主题(body)
当 MQTT
发送的消息类型是 CONNECT
(连接)、PUBLISH
(发布)、SUBSCRIBE
(订阅)、SUBACK
(订阅确认)、则会带有负荷。
各种类型消息的控制报文参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html
MQTT
的消息类型(Message Type)(控制报文类型)
名字 | 值 | 报文流动方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | 禁止 | 保留 |
1.2.4 消息质量(QoS)
消息质量(QoS
):
-
QoS 0
:最多分发一次。消息的传递完全依赖于底层的TCP/IP
协议,协议里没有定义应答和重试,消息要么只会到达服务端一次,要么根本没有到达。
只会发送一次,不管成不成功 -
QoS 1
:至少分发一次。服务器的消息接收由PUBACK
消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。
未成功会继续发送,直到成功,可能会收到多次 -
QoS 2
:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。
未成功会继续发送,但会保证只收到一次
通过下面的例子可以更深刻的理解上面三个传输质量等级。
比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0
质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。
之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1
质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。
最后用户用完单车后,需要提交付款表单,可以使用QoS level 2
质量消息,这样确保只传递一次数据,否则用户就会多付钱了。
1.3 搭建MQTT服务
1.3.1 直接使用MQTT搭建
在Linux上搭建MQTT服务
打开EMQ官网:https://www.emqx.io/cn/products/broker
点击开始试用
选择服务器对应版本
复制下载命令到ssh工具中执行
下载完成
下载完成后执行安装命令
安装成功后执行命令:
sudo emqx start
出现以下信息表示启动成功
浏览器访问ip:18083
进入管理界面,默认账号为admin,密码为public
1.3.2 使用RabbitMQ搭建MQTT
点击了解 RabbitMQ 搭建
搭建完 RabbitMQ
后,启用 RabbitMQ
的 MQTT
插件了,默认是不启用的,使用如下命令开启即可:rabbitmq-plugins enable rabbitmq_mqtt
开启成功后,查看管理控制台,我们可以发现MQTT服务运行在1883端口上了。
如果使用
RabbitMQ与Web
端交互,那么底层使用的是 WebSocket
,所以需要开启 RabbitMQ
的 MQTT WEB
支持,使用如下命令开启即可:rabbitmq-plugins enable rabbitmq_web_mqtt
开启成功后,查看管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;
1.4 使用RabbitMQ的SpringBoot操作MQTT
1.4.1 pom.xml
<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.4.2 配置文件
在application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息;
rabbitmq:
mqtt:
url: tcp://localhost:1883
username: guest
password: guest
defaultTopic: testTopic
编写一个Java配置类从配置文件中读取配置便于使用;
1.4.3 配置类
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
/**
* RabbitMQ连接用户名
*/
private String username;
/**
* RabbitMQ连接密码
*/
private String password;
/**
* RabbitMQ的MQTT默认topic
*/
private String defaultTopic;
/**
* RabbitMQ的MQTT连接地址
*/
private String url;
}
1.4.4 订阅类
添加MQTT消息订阅者相关配置,使用@ServiceActivator
注解声明一个服务激活器,通过MessageHandler来处理订阅消息;
/**
* MQTT消息订阅者相关配置
*/
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理订阅消息
log.info("handleMessage : {}",message.getPayload());
}
};
}
}
1.4.5 发布
添加MQTT消息发布者相关配置;
/**
* MQTT消息发布者相关配置
*/
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
1.4.6 网关和测试
添加MQTT网关,用于向主题中发送消息;
/**
* MQTT网关,通过接口将数据传递到集成流
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送消息到默认topic
*/
void sendToMqtt(String payload);
/**
* 发送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 发送消息到指定topic并设置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;
/**
* MQTT测试接口
*/
@Api(tags = "MqttController", description = "MQTT测试接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
@ApiOperation("向默认主题发送消息")
public CommonResult sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
@ApiOperation("向指定主题发送消息")
public CommonResult sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
}
}
1.5 不使用RabbitMQ的SpringBoot搭建提供端
1.5.1 pom.xml
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.5.2 修改配置文件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用户名
username: admin
#密码
password: public
#客户端id(不能重复)
client:
id: provider-id
#MQTT默认的消息推送主题,实际可在调用接口时指定
default:
topic: topic
server:
port: 8081
1.5.3 消息发布者客户端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客户端对象
*/
private MqttClient client;
/**
* 客户端连接服务端
*/
public void connect(){
try {
//创建MQTT客户端对象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(true);
//设置连接用户名
options.setUserName(username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
//设置回调
client.setCallback(new MqttProviderCallBack());
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布消息
* @param qos 服务质量等级
* 0 只会发送一次,不管成不成功
* 1 未成功会继续发送,直到成功,可能会收到多次
* 2 未成功会继续发送,但会保证只收到一次
* @param retained 保留标志
* 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订阅这个主题时,会把消息推送给这个订阅者
* 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)
*/
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主题目的地,用于发布/订阅消息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一种机制来跟踪消息的传递进度。
//用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
MqttDeliveryToken token;
try {
//将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
//一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
1.5.4 消息发布客户端回调
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
public class MqttProviderCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 与服务器断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println(clientId + "与服务器断开连接");
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
System.out.println(client.getClientId() + "发布消息成功!");
}
}
1.5.5 创建控制器测试发布消息
import com.xdemo.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SendController {
@Autowired
private MqttProviderConfig providerClient;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
providerClient.publish(qos,retained,topic,message);
return "发送成功";
}catch (Exception e){
e.printStackTrace();
return "发送失败";
}
}
}
1.6 不使用RabbitMQ的SpringBoot搭建消费端
在父工程下创建一个Springboot项目作为消息消费者,导入以下依赖
1.6.1 pom.xml
<!--mqtt相关依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.6.2 配置文件
spring:
application:
name: consumer
#MQTT配置信息
mqtt:
#MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用户名
username: admin
#密码
password: public
#客户端id(不能重复)
client:
id: consumer-id
#MQTT默认的消息推送主题,实际可在调用接口时指定
default:
topic: topic
server:
port: 8082
1.6.3 消费者客户端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客户端对象
*/
private MqttClient client;
/**
* 在bean初始化后连接到服务器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客户端连接服务端
*/
public void connect(){
try {
//创建MQTT客户端对象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(true);
//设置连接用户名
options.setUserName(username);
//设置连接密码
options.setPassword(password.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
//设置回调
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//订阅主题
//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
int[] qos = {1,1};
//主题
String[] topics = {"topic1","topic2"};
//订阅主题
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 断开连接
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅主题
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
1.6.4 消息消费者客户端回调
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallBack implements MqttCallback {
/**
* 客户端断开连接的回调
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("与服务器断开连接,可重连");
}
/**
* 消息到达的回调
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主题 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息发布成功的回调
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
1.6.5 控制器提供手动建立连接和断开连接方法
import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@Autowired
private MqttConsumerConfig client;
@Value("${spring.mqtt.client.id}")
private String clientId;
@RequestMapping("connect")
@ResponseBody
public String connect(){
client.connect();
return clientId + "连接到服务器";
}
@RequestMapping("disConnect")
@ResponseBody
public String disConnect(){
client.disConnect();
return clientId + "与服务器断开连接";
}
}
1.7 测试提供端和消费端
1.7.1 使用三分工具
可以使用 MQTT
客户端来测试 MQTT
的即时通讯功能,这里使用的是 MQTTBox
这个客户端工具。
首先下载并安装好 MQTTBox
,下载地址:http://workswithweb.com/mqttbox.html
点击Create MQTT Client按钮来创建一个MQTT客户端;
接下来对MQTT客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可;
再配置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;
发布者向主题中发布消息,订阅者可以实时接收到
1.7.2 使用代码测试
分别启动两个项目,可以在管理界面看到创建的两个客户端
调用发布消息接口发布消息
消费者控制台打印
客户端断线消息恢复
把消费者与服务端断开连接
再调用发布消息接口发送两条消息到topic1,然后再把消费者连接到服务端
控制台没有东西打印
修改消费者客户端配置,把setCleanSession改为false
重启项目,把消费者客户端断开连接,调用发布消息接口发布两条消息,再把消费者和服务端连接上