老蒋的MQTT学习之路

说实话从第三家公司听过和设备间通信是用MQTT的,但是实际上工作几年真没怎么用过,刚好最近做一个边缘盒子就用到了,本来打算用华为开源的KubeEdge的,但是华为的东西一贯坑老多而且没有Java的demo,最后还是用EMQX和mqtt实现了,设备内部的通信,特此记录。

首先大概介绍一下MQTT,你可以把它大致看做一种MQ中间件和Kafka那种差不多,但是MQTT其实一开始是美国IBM公司为卫星通信开发的一种工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,全称MQTT(消息队列遥测传输)。然后我工作过的几家和设备相关的公司里其实大部分同事在和设备通信的过程中一般是建议用MQTT的,当然也有相当一部分为了方便和实时性而选择websocket(我本人是很不建议这么干的,因为websocket的应用场景根本不是为设备间通信设计的,websocket实际上更加适用于前后端交互尤其是大屏上使用,因为它很占服务器资源,一台8核16G的服务器根本撑不住几万个websocket链接,而且它还是有状态的,不方便集群拓展,虽然也有广播等集群拓展方式,但总的来说我个人觉得它不是设备通信的最优解,MQTT才是最优解)。

MQTT作为一个消息中间件那么也有客户端(client)和服务端(broker),这里选几个热门开源的 MQTT Broker,其中部分项目提供商业支持,做简单选型对比。


1.png

2.png

3.png

4.png

这里我用的是EMQ,因为它功能特性较为丰富,社区活跃度也较高,当然用Mosquitto的也比较多,华为的KubeEdge其实用的就是Mosquitto,毕竟Mosquitto比较轻量。
yml文件配置:

mqtt:
  #连接地址:写死宿主机的dockerIP 
  address: tcp://172.17.0.1:31883
 #客户端id:随便填,没有会自动生成
  clientId: box_test
#用户名
  username: emqx_test
#密码
  password: emqx_test_password
#会话心跳时间
  keepAlive: 60
#超时时间
  timeout: 60
#订阅的topic
  subtopic: BOX/TEST

mqtt配置Java类:

@Component
@Data
@ConfigurationProperties("mqtt")
public class MqttConfig {
    /**
     * 连接地址
     */
    private String address;

    /**
     * 客户端id
     */
    private String clientId;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 会话心跳时间
     */
    private int keepAlive;

    /**
     * 超时时间
     */
    private int timeout;

    /**
     * 订阅的消息
     */
    private String subTopic;
}

mqtt消息格式:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class MqttMessage<T> implements Serializable {
    /**
     * 行为(自定义枚举)
     *
     * @see MessageAction
     */
    private String action;
    /**
     * 消息类型(自定义枚举)
     *
     * @see MessageType
     */
    private String type;

    /**
     * 消息数据
     */
    private T message;
}

mqtt回调:

@Component
@Slf4j
public class Callback implements MqttCallback {
    
    @Resource
    private MqttConfig mqttConfig;

    @Resource
    private MqttClient mqttClient;

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("控制中心mqtt断连:", cause);
        mqttClient.connect();
    }

    /**
     * 发送消息,消息到达后处理方法
     *
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息发送完成,messageId={},topics={}", messageId, Arrays.toString(topics));
    }

    /**
     * 订阅主题接收到消息处理方法
     *
     * @param topic
     * @param message
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        // 根据 topic 名称获得 deviceId
        String deviceId = MqttTopic.getDeviceIdByTopic(topic);
        MqttClient mqttClient = BeanGetter.getBean(MqttClient.class);
        String request = new String(message.getPayload());
        log.info("接受到的主题:{}, 接受到的消息:{}, messageId:{}, qos:{}", topic, request, message.getId(), message.getQos());
        MessageRequest messageRequest = JSON.parseObject(request, MessageRequest.class);
        MessageAction action = messageRequest.getAction();
        MessageType type = messageRequest.getType();
        Object obj = messageRequest.getMessage();
        switch (action) {
            case NOTIFY:
                switch (type) {
                    case CONTROL:
                        //do some thing
                        break;
                    case SEARCH_DEVICE_STATUS:
                        //do some thing
                        break;
                    default:
                        break;
                }
                break;
            case INVITE:
                //do some thing
                break;
            default:
                break;
        }
    }
}

mqtt客户端:

@Component
@Slf4j
public class MqttClient {

    public org.eclipse.paho.client.mqttv3.MqttClient cMqttClient;

    @Resource
    private Callback callback;


    public boolean sendMessage(String topic, MqttMessage message) {
        try {
            cMqttClient.publish(topic, message);
        } catch (MqttException e) {
            e.printStackTrace();
            log.error("服务响应包发送失败 ", e);
            return false;
        }
        return true;
    }

    public org.eclipse.paho.client.mqttv3.MqttClient getClient() {
        return cMqttClient;
    }
/**
*无参连接mqtt broker
*/
@PostConstruct
    public void connect() {
        org.eclipse.paho.client.mqttv3.MqttClient client;
        //连接mqtt服务器
        try {
            client = new org.eclipse.paho.client.mqttv3.MqttClient(mqttConfig.getAddress(), mqttConfig.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttConfig.getUsername());
            options.setPassword(mqttConfig.getPassword().toCharArray());
            options.setCleanSession(true);
            client.setCallback(innerCallback);
            log.info("正在mqtt服务器建立连接: {}", mqttConfig.getAddress());
            client.connect(options);
            log.info("完成mqtt服务器的连接: {}", mqttConfig.getAddress());
            mqttClient = client;
            //订阅主题
            try {
                client.subscribe(mqttConfig.getSubTopic());
                log.info("订阅主题成功 inner subscribe: {}", mqttConfig.getSubTopic());
            } catch (MqttException e) {
                log.error("订阅主题失败 inner subscribe fail: {}", mqttConfig.getSubTopic(), e);
            }
        } catch (MqttException e) {
            e.printStackTrace();
            log.error("连接mqtt服务器失败请检查配置与inner-mqtt服务器状态");
        }
    }

    public void reconnect() {
        connect();
    }

/**
* mqtt 消费确认
*/
    public void manualMessageArrivedComplete(Integer messageId, Integer qos) {
        try {
            cMqttClient.messageArrivedComplete(messageId, qos);
            log.info("messageId = {},qos = {} ,消息确认成功!", messageId, qos);
        } catch (MqttException e) {
            log.error("消息确认失败 messageId={}, qos={}", messageId, qos);
        } catch (NullPointerException e) {
            log.error("messageId or qos 为空");
        }
    }

/**
*有参连接mqtt broker
*/
    public void mqttClientConnect(CheckMqttResponse checkMqttResponse) {
        if(cMqttClient != null){
            try {
                cMqttClient.disconnect();
                //强制取消之前的控制中心mqtt订阅
                cMqttClient.close(true);
            } catch (MqttException e) {
                log.error("控制中心取消订阅失败: ", e);
            }
        }
        //要订阅的topics
        List<String> topics = new ArrayList<>();
        topics.add(String.format("DEVICE/CLIENT/%s", checkMqttResponse.getClientId()));
        topics.add(String.format("DEVICE/SERVER/%s", checkMqttResponse.getClientId()));
        org.eclipse.paho.client.mqttv3.MqttClient client;
        //连接mqtt服务器
        try {
            client = new org.eclipse.paho.client.mqttv3.MqttClient(checkMqttResponse.getBroker(), checkMqttResponse.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(checkMqttResponse.getUsername());
            options.setPassword(checkMqttResponse.getPassword().toCharArray());
            options.setCleanSession(true);
            client.setCallback(callback);
            log.info("正在与mqtt服务器建立连接: {}", checkMqttResponse.getBroker());
            client.connect(options);
            log.info("完成与mqtt服务器的连接: {}", checkMqttResponse.getBroker());
            cMqttClient = client;
            //订阅主题
            try {
                for (String topic : topics) {
                    log.info("订阅主题:{}", topic);
                    cMqttClient.subscribe(topic, 1);
                }
            } catch (MqttException e) {
                log.error("订阅主题失败: ", e);
            }
        } catch (MqttException e) {
            log.error("连接mqtt服务器失败请检查配置与控制中心mqtt服务器状态", e);
        }
    }
}

断线重连定时任务(每隔2秒检测一次连接状态):

@Component
@Slf4j
public class MqttReconnectRunner {
    @Resource
    private MqttClient mqttClient;

   

    @Scheduled(fixedDelay = 2000)
    public void reconnect() {
        MqttClient cMqttClient = mqttClient.getClient();
        if (cMqttClient != null) {
            //mqtt断联
            if (!cMqttClient.isConnected()) {
                log.info("mqtt断连,重试中...");
                mqttClient.connect();
            }
        } else {
            //mqtt未初始化
            mqttClient.connect();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容