MQTT从搭建代理服务器到推送消息过程

MQTT简介:

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议!

MQTT消息的主要特点:

使用(publish/subscribe)消息模式,简称p/s模式,即发布/订阅!提供一对多的发送方式!
MQTT根据QoS定义的等级来传输消息:
  • level 0:最多一次的传输
消息是基于TCP/IP网络传输的。没有回应,在协议中也没有定义重传的语义。消息可能到达服务器1次,也可能根本不会到达。
  • level 1:至少一次的传输
服务器接收到消息会被确认,通过传输一个PUBACK信息。如果有一个可以辨认的传输失败,无论是通讯连接还是发送设备,还是过了一段时间确认信息没有收到,发送方都会将消息头的DUP位置1,然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。
如果客户端没有接收到PUBACK信息(无论是应用定义的超时,还是检测到失败然后通讯session重启),客户端都会再次发送PUBLISH信息,并且将DUP位置1。
当它从客户端接收到重复的数据,服务器重新发送消息给订阅者,并且发送另一个PUBACK消息。
  • level 2: 只有一次的传输
在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输,当重复的消息不被允许的情况下使用。这样增加了网络流量,但是它通常是可以接受的,因为消息内容很重要。
QoS level 2在消息头有Message ID。

接下来开始我们的表演:

下载代理服务器
创建代理服务器
  • 下载完成然后解压目录

  • 打开dos窗口进入到apache-apollo-1.7.1\bin目录下

  • 执行apollo create testbroker命令创建一个名称为testbroker的代理服务器

  • Paste_Image.png
  • 下面就是我们创建的代理服务器


    Paste_Image.png
启动代理服务器
  • 使用dos进入testbroker目录中的bin目录下

  • 执行apollo-broker run命令启动代理服务器

  • Paste_Image.png
通过HTTP访问代理服务器

现在我们可以打开浏览器看下我们的代理服务器
输入网址http://127.0.0.1:61680/

  • Paste_Image.png
  • 用户名密码可到配置文件中查看

  • 进入testbroker目录下的etc目录

Paste_Image.png
  • users.properties中配置的用户名和密码
Paste_Image.png
  • 默认有个用户名为admin,密码为password的用户

  • 我们也可以自己配置用户

  • 现在就用默认用户登陆

  • Paste_Image.png

OK登陆成功

接下来我们编写Android客户端

Paste_Image.png
一定保证客户端和服务端以及代理服务器所在的电脑在同一网段下
  • 可以在电脑上生成wifi热点,手机客户端连接热点即可
  • 接下来直接贴Android客户端代码
MqttService.java
package com.example.jingwc.mqtt_demo;

import android.app.Service;
import android.content.Intent;
import android.os.Binder;
import android.os.IBinder;
import android.util.Log;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttService extends Service {

    /**
     * 代理服务器ip地址
     */
    public static final String MQTT_BROKER_HOST = "tcp://192.168.1.107:61613";

    /**
     * 客户端唯一标识
     */
    public static final String MQTT_CLIENT_ID = "android-jingwc";

    /**
     * 订阅标识
     */
    public static final String MQTT_TOPIC = "jingwc";

    /**
     * 用户名
     */
    public static final String USERNAME = "admin";
    /**
     *  密码
     */
    public static final String PASSWORD = "password";

    private MqttClient mqttClient;

    public MqttService() {
    }

    @Override
    public IBinder onBind(Intent intent) {
        return binder;
    }

    /**
     * 连接mqtt
     */
    public void connect(){
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,
            // MemoryPersistence设置clientid的保存形式,默认为以内存保存
            mqttClient = new MqttClient(MQTT_BROKER_HOST,MQTT_CLIENT_ID,new MemoryPersistence());
            // 配置参数信息
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
            // 这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置用户名
            options.setUserName(USERNAME);
            // 设置密码
            options.setPassword(PASSWORD.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 连接
            mqttClient.connect(options);

            // 订阅
            mqttClient.subscribe(MQTT_TOPIC);

            // 设置回调
            mqttClient.setCallback(new MqttCallback() {
                //连接丢失后,一般在这里面进行重连
                @Override
                public void connectionLost(Throwable throwable) {
                    Log.d("test","connectionLost");
                }


                //subscribe后得到的消息会执行到这里面
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    Log.d("test","messageArrived"+mqttMessage.toString());
                }

                //publish后会执行到这里
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    Log.d("test","deliveryComplete");
                }
            });

        } catch (MqttException e) {
            e.printStackTrace();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 断开连接
     */
    public void disconnect(){
        if(mqttClient != null){
            if(mqttClient.isConnected()){
                try {
                    mqttClient.disconnect();
                    mqttClient = null;
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private final Binder binder = new MyBinder();

    class MyBinder extends Binder{

        public MqttService getService(){
            return MqttService.this;
        }
    }
}

MainActivity.java
package com.example.jingwc.mqtt_demo;

import android.content.ComponentName;
import android.content.Intent;
import android.content.ServiceConnection;
import android.os.IBinder;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;

public class MainActivity extends AppCompatActivity {

    MqttService service = null;

    private ServiceConnection mConnection = new ServiceConnection() {
        @Override
        public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
            service = ((MqttService.MyBinder)iBinder).getService();
        }

        @Override
        public void onServiceDisconnected(ComponentName componentName) {
            service = null;
        }
    };

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Button bt_connect = (Button) findViewById(R.id.bt_connect);
        Button bt_disconnect = (Button) findViewById(R.id.bt_disconnect);

        bt_connect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                // 连接
                service.connect();
            }
        });

        bt_disconnect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                // 断开连接
                service.disconnect();
            }
        });

        bindService(new Intent(this,MqttService.class),mConnection,BIND_AUTO_CREATE);
    }
}

服务端代码

  • 也可以在写一个android程序当作服务端
  • 我这里写的是java项目
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttServer {
    
    // 代理服务器ip地址
    private static String host = "tcp://192.168.1.107:61613";
    
    private static String userName = "admin";
    private static String password = "password";
    
    private static MqttClient client;
    
    // 主题
    private static MqttTopic topic;
    
    private static MqttMessage message;
    
    // 订阅标识
    private static String topicStr = "jingwc";
    
    public static void main(String[] args) throws MqttException{
        client = new MqttClient(host,"java-server-jingwc",new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userName);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(20);
        
        topic = client.getTopic(topicStr);
        
        message = new MqttMessage();
        message.setQos(1);
        message.setRetained(true);
        message.setPayload("from server message".getBytes());
        client.connect(options);
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("token:"+token.isComplete());
        
    }

}

服务端通过代理服务器发送客户端订阅的消息图

  • Paste_Image.png
Paste_Image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,831评论 25 707
  • MQTT简介 MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量...
    小熊_c37d阅读 11,243评论 0 3
  • 本来有一颗水晶般的玻璃心,现在被蹂躏成了盐,加点儿水稀释一下就是泪了
    你说我听好么阅读 236评论 0 0
  • 昨夜一场狂风暴雨,早上天气倒是晴朗起来,雨后空气湿润清爽,由不得心动。送下女儿回家,推出自行车,去转转。 雨后的一...
    清和qinghe阅读 196评论 2 2