1. 关于mqtt生产端
@Configuration
public class MqttOutboundConfiguration {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
String[] serverURIs = mqttProperties.getOutbound().getUrls().split(",");
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// factory.setServerURIs("tcp://192.168.10.100:1883", "tcp://host2:1883");
factory.setServerURIs(serverURIs);
factory.setCleanSession(false);
// factory.setUserName("username");
// factory.setPassword("password");
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
2. 关于mqtt消费端
@Configuration
@Slf4j
public class MqttInboundConfiguration {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
String[] inboundTopics = mqttProperties.getInbound().getTopics().split(",");
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getUrl(), mqttProperties.getInbound().getClientId(),
inboundTopics);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info((String) message.getPayload());
}
};
}
}
3. mqtt使用
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
Created by 2018-04-09