EMQ2.0消息服务器简介
EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。
五分钟下载启动EMQ
由于emqttd是用Erlang语言编写的,所以在开始之前,请确保已安装Erlang。
Erlang Windows下载安装
下载地址[http://www.erlang.org/downloads](http://www.erlang.org/downloads
安装完成后设置环境变量,手动编辑“path”加入路径 C:\Program Files\erl8.2\bin。
检查Erlang是否安装成功打,开 cmd ,输入 erl 后回车,如果看到如下的信息,表明安装成功。
EMQ下载地址
程序包下载后,可直接解压进入文件夹后启动运行,例如Mac平台
& 启动emqttd
./bin/emqttd start
& 检查运行状态
./bin/emqttd_ctl status
& 停止emqttd
./bin/emqttd stop
Windows平台
& 启动emqttd
bin\emqttd start
& 检查运行状态
bin\emqttd_ctl status
& 停止emqttd
bin\emqttd stop
Web管理控制台(Dashboard)
EMQ消息服务启动后,会默认加载Dashboard插件,启动Web管理控制台。用户可通过Web控制台,查看服务器运行状态、统计数据、客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)、插件(Plugin)。
控制台地址:http:127.0.0.1:18083,默认用户:admin,密码:public。
EMQ java客户端之mqtt-client
在这里使用maven构建一个Spring-Boot项目,本人选的版本为1.5.13。将下载下来的项目解压,导入IDEA中,在pom.xml中添加如下依赖
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
本文只介绍非阻塞式CallbackConnection的用法
@Component
public class EmqttdRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
MQTT mqtt=new MQTT();
mqtt.setHost("localhost",1883);
mqtt.setClientId("mengsu");
CallbackConnection connection=mqtt.callbackConnection();
connection.listener(new Listener() {
@Override
public void onConnected() {
}
@Override
public void onDisconnected() {
}
@Override
public void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable ack) {
//当有设备向服务已订阅的主题发送消息时,该方法会消费
ack.run();
System.out.println(UTF8Buffer.encode(buffer.toString()));
}
@Override
public void onFailure(Throwable throwable) {
}
});
connection.connect(new Callback<Void>() {
@Override
public void onSuccess(Void aVoid) {
//连接成功后会默认订阅主题($client/mengsu)
System.out.println("连接成功");
}
@Override
public void onFailure(Throwable throwable) {
}
});
//新建一个device主题
Topic[] topic=new Topic[]{new Topic("device", QoS.AT_LEAST_ONCE)};
connection.subscribe(topic, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] bytes) {
System.out.println("订阅成功");
}
@Override
public void onFailure(Throwable throwable) {
}
});
}
}
更多关于mqtt-client使用,请参考mqtt-client
EMQ提供了REST API
可用postman模拟发布 消息
post http://localhost:8080/api/v2/mqtt/publish
请求示例:
注解
emqtt需要进行Basic认证,模拟请求时,在postman中添加Basic Auth认证,账号:admin,密码:public。请求参数为json类型。