库:Paho,这个的社区比较活跃
安装java_paho
没有使用maven,所以手动下载的jar包
根据需要下载下载地址:https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/
我下载的是org.eclipse.paho.client.mqttv3 -1.2.2版本
代码实现
///服务器地址
String broker = "tcp://localhost:1883";
///clientid在同一个emqx服务器必须唯一
String clientId = "tn_link2";
///本地消息的持久化实例
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker:" + broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
String topic = "pubdata";
System.out.println("Subscribe to topic:" + topic);
//订阅主题
sampleClient.subscribe(topic);
//设置了一个回调实例,消息转发过来的时候调用该实例的方法
sampleClient.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) throws Exception {
String theMsg = MessageFormat.format("{0} topic is {1}", new String(message.getPayload(),"utf-8"),topic);
System.out.println(theMsg);
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectionLost(Throwable throwable) {
}
});
String content = "Message from MqttPublishSample";
int qos = 2;
System.out.println("Publishing message:" + content);
MqttMessage message = new MqttMessage(content.getBytes());
//可以指定也可不指定
message.setQos(qos);
//发布消息
sampleClient.publish(topic, message);
System.out.println("Message published");
} catch (MqttException me) {
System.out.println("reason" + me.getReasonCode());
System.out.println("msg" + me.getMessage());
System.out.println("loc" + me.getLocalizedMessage());
System.out.println("cause" + me.getCause());
System.out.println("excep" + me);
me.printStackTrace();
}
经测试是可以连接到emqx服务器