安卓物联网环境监控与设备控制实战

需求:安卓手机通过app远程监控卧室温湿度,控制电源开关。
流程概述:安卓手机通过MQTT协议服务器与ESP32(Arduino)开发板连接并进行数据交互

1. MTQQ协议

转载自(https://baijiahao.baidu.com/s?id=1608411516249221334&wfr=spider&for=pc

image

1.1 简介:

MQTT协议(Message Queuing Telemetry Transport),翻译过来就是遥信消息队列传输,是IBM公司于1999年提出的,现在最新版本是3.1.1。MQTT是一个基于TCP的发布订阅协议,设计的初始目的是为了极有限的内存设备和网络带宽很低的网络不可靠的通信,非常适合物联网通信。
MQTT的网络层级:


image

1.2 工作原理:

image

发布订阅示意图

如上图所示,客户端A连接到消息代理(message broker),消息代理返回确认消息。客户B发布消息温度25度,客户A订阅‘温度’,消息代理吧消息推给客户A,客户A发布温度20度,但客户B没有订阅,消息代理不推送。消息B又发布了温度38度,客户A就再次收到订阅的消息38度。最后客户端断开连接。整个过程非常简单清晰,容易理解。

1.3 MQTT消息的QOS

MQTT支持三种QOS等级:

QoS 0:“最多一次”,消息发布完全依赖底层 TCP/IP 网络。分发的消息可能丢失或重复。例如,这个等级可用于环境传感器数据,单次的数据丢失没关系,因为不久后还会有第二次发送。

QoS 1:“至少一次”,确保消息可以到达,但消息可能会重复。

QoS 2:“只有一次”,确保消息只到达一次。例如,这个等级可用在一个计费系统中,这里如果消息重复或丢失会导致不正确的收费。

1.4 MQTT的消息类型

1 CONNECT – 连接服务端:客户端到服务端的网络连接建立后, 客户端发送给服务端的第一个报文必须是CONNECT报文

2 CONNACK – 确认连接请求:服务端发送CONNACK报文响应从客户端收到的CONNECT报文。 服务端发送给客户端的第一个报文必须是CONNACK。如果客户端在合理的时间内没有收到服务端的CONNACK报文, 客户端应该关闭网络连接。合理的时间取决于应用的类型和通信基础设施。

3 PUBLISH – 发布消息:PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。

4 PUBACK –发布确认:PUBACK报文是对QoS 1等级的PUBLISH报文的响应。

5 PUBREC – 发布收到( QoS 2, 第一步):PUBREC报文是对QoS等级2的PUBLISH报文的响应。 它是QoS 2等级协议交换的第二个报文。

6 PUBREL – 发布释放( QoS 2, 第二步):PUBREL报文是对PUBREC报文的响应。 它是QoS 2等级协议交换的第三个报文。

7 PUBCOMP – 发布完成( QoS 2, 第三步):PUBCOMP报文是对PUBREL报文的响应。 它是QoS 2等级协议交换的第四个也是最后一个报文。

8 SUBSCRIBE - 订阅主题:客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅。 每个订阅注册客户端关心的一个或多个主题。 为了将应用消息转发给与那些订阅匹配的主题, 服务端发送PUBLISH报文给客户端。 SUBSCRIBE报文也( 为每个订阅) 指定了最大的QoS等级, 服务端根据这个发送应用消息给客户端。

9 SUBACK – 订阅确认:服务端发送SUBACK报文给客户端, 用于确认它已收到并且正在处理SUBSCRIBE报文。

10 UNSUBSCRIBE –取消订阅:客户端发送UNSUBSCRIBE报文给服务端, 用于取消订阅主题。

11 UNSUBACK – 取消订阅确认:服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文。

12 PINGREQ – 心跳请求:客户端发送PINGREQ报文给服务端的。 用于:1. 在没有任何其它控制报文从客户端发给服务的时, 告知服务端客户端还活着。2. 请求服务端发送 响应确认它还活着。3. 使用网络以确认网络连接没有断开。

13 PINGRESP – 心跳响应:服务端发送PINGRESP报文响应客户端的PINGREQ报文。 表示服务端还活着。

14 DISCONNECT –断开连接:DISCONNECT报文是客户端发给服务端的最后一个控制报文。 表示客户端正常断开连接。

1.5 MQTT控制报文格式

image

2. MQTT服务器搭建

MQTT服务器非常多,如apache的ActiveMQ,emtqqd,HiveMQ,Emitter,Mosquitto,Moquette等等
在这里我们使用apache-apollo-1.7.1作为MQTT服务器
运行环境:阿里云ubuntu1604
前往下载压缩包:(http://activemq.apache.org/apollo/download.html
apache-apollo-1.7.1-unix-distro.tar.gz
下载后拷贝到服务器,使用linux命令解压:

tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz

解压完成后进入文件夹

cd apache-apollo-1.7.1

新建broker(MQTT的服务器被称作broker,broker即一个MQTT服务器项目)

./bin/apollo create mybroker

配置
参考:(https://blog.csdn.net/qq_27109703/article/details/78789494

cd mybroker
vim etc/apollo.xml

image.png

部分配置按照上图修改,确保外网可以访问http监控页面(http://ip:61680),如果依然不能访问可能是因为以下问题:
ubuntu防火墙启用中(参考解决:https://www.cnblogs.com/EasonJim/p/7595213.html
阿里云安全组未配置61680与61613端口

vim etc/users.properties

配置用户名密码:

admin=admin

配置完成后即可启动服务器:

./bin/apollo-broker run
或
./bin/apollo-broker-service start
脚本参数:apollo-broker-service {start|stop|restart|force-stop|status}

启动后可以访问http监控页面(http://ip:61680),输入用户名密码即可进入监控。

image.png

3. Android端接入MQTT服务器

使用AndroidStudio新建项目
app的build.gradle:

//eclipse的mqtt协议开发包
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.1'

AndroidManifest.xml添加权限

    <uses-permission android:name="android.permission.INTERNET"/>
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>

Client.java:MQTT连接客户端类

package com.myhuanghai.mymqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;

public class Client {

    private static final String HOST = "tcp://47.104.142.113:61613";
    private static final String clientid = "android";
    private static final String userName = "admin";
    private static final String passWord = "admin";
    private HashMap<String, MqttTopic> topicList = new HashMap<>();


    void start(String[] publicTopics, String[] subscribeTopics, PushCallback pushCallback) {
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调
            client.setCallback(pushCallback);
            //MqttTopic topic = client.getTopic(TOPIC);
            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
            //options.setWill(topic, "close".getBytes(), 2, true);

            client.connect(options);
            //订阅消息
            int[] Qos = new int[subscribeTopics.length];
            for (int i=0;i<Qos.length;i++){
                Qos[i] = 1;
            }

            client.subscribe(subscribeTopics, Qos);
            for (String publicTopic : publicTopics) {
                topicList.put(publicTopic, client.getTopic(publicTopic));
            }


        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void publish(String topic, String message) throws MqttException, UnsupportedEncodingException {

        MqttMessage msg = new MqttMessage();
        msg.setQos(2);
        msg.setRetained(true);
        msg.setPayload(ByteUtils.stringToByte(message));
        MqttTopic mqttTopic = topicList.get(topic);
        MqttDeliveryToken token = mqttTopic.publish(msg);
        token.waitForCompletion();
        System.out.println("message is published completely! "
                + token.isComplete());
    }

    public static void main(String[] args) {
    }
}

PushCallback.java

package com.myhuanghai.mymqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 发布消息的回调类
 *
 * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
 * 在回调中,将它用来标识已经启动了该回调的哪个实例。
 * 必须在回调类中实现三个方法:
 *
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
 *
 *  public void connectionLost(Throwable cause)在断开连接时调用。
 *
 *  public void deliveryComplete(MqttDeliveryToken token))
 *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
 *  由 MqttClient.connect 激活此回调。
 *
 */
public interface PushCallback extends MqttCallback {

    void connectionLost(Throwable cause);

    void deliveryComplete(IMqttDeliveryToken token);

    void messageArrived(String topic, MqttMessage message) throws Exception;
}

byte工具类源码ByteUtils.java

package com.myhuanghai.mymqtt;


import java.io.UnsupportedEncodingException;

/**
 * Created by huang on 2017/6/30.
 */
public class ByteUtils {


    /**
     * string到字节数组的转换.
     */
    public static byte[] stringToByte(String str) throws UnsupportedEncodingException {
        return str.getBytes("UTF-8");
    }

    /**
     * 字节数组到String的转换.
     */
    public static String bytesToString(byte[] str) {
        String keyword = null;
        try {
            keyword = new String(str,"UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return keyword;
    }

}

activity_main.xml

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:orientation="vertical"
    tools:context=".MainActivity">

    <TextView
        android:id="@+id/tv"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="正在连接"
        />
    <Button
        android:id="@+id/btn"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:text="电源:ON"/>

</LinearLayout>

MainActivity.java

package com.myhuanghai.mymqtt;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.io.UnsupportedEncodingException;

public class MainActivity extends AppCompatActivity {

    private TextView textView;
    private Button button;
    boolean state = true;//当前继电器开关状态

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        textView = findViewById(R.id.tv);
        button = findViewById(R.id.btn);

        final Client client = new Client();
        client.start(new String[]{"power"},new String[]{"temperature","state"},new PushCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        textView.setText("连接丢失");
                    }
                });
                cause.getCause().printStackTrace();
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
            }

            @Override
            public void messageArrived(final String topic, final MqttMessage message) throws Exception {
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        if (topic.equals("temperature")){
                            textView.setText(""+ByteUtils.bytesToString(message.getPayload()));
                        }else if (topic.equals("state")){
                            state = ByteUtils.bytesToString(message.getPayload()).equals("state:1");
                            button.setText("电源:"+(state?"on":"off"));
                        }
                    }
                });

            }
        });

        button.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                try {
                    client.publish("power",state?"0":"1");
                    state = !state;
                } catch (MqttException e) {
                    e.printStackTrace();
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

4. ESP32开发板接入MQTT

4.1 接线

微信图片_20190302182736.jpg
未命名文件.png

ESP使用Arduino兼容模式开发,开发请安装Arduino工具


image.png

4.2 开发板配置:

image.png

将ESP32开发板带的类库解压复制到Arduino目录下的hardware文件夹下并重启
Arduino代码使用以下类库:DHTesp.h、WiFi.h、WiFiClient.h、PubSubClient.h
下载类库:


image.png

4.3 Arduino代码:


#include <DHTesp.h>
#include <WiFi.h>
#include <WiFiClient.h>
#include <PubSubClient.h>

#define DHT11PIN 18
#define RELAYPIN 27


const char* ssid = "ASUS";
const char* password = "jingai.love";
const char* mqtt_server = "47.104.142.113"; // 使用HIVEMQ 的信息中转服务
const char* mqtt_username = "admin";
const char* mqtt_password = "admin";
const char* sub_topic = "power";                     // 订阅信息主题
const char* pub_topic_1 = "temperature";                     // 发布信息主题
const char* pub_topic_2 = "state";                     // 发布信息主题
const char* client_id = "esp32";                   // 标识当前设备的客户端编号
int state = 0;

DHTesp dht;
WiFiClient espClient;                                                         // 定义wifiClient实例
PubSubClient client(espClient);                                         // 定义PubSubClient的实例
long lastMsg = 0;                                                               // 记录上一次发送信息的时长

void setup() {
  pinMode(RELAYPIN, OUTPUT);                               // 定义继电器输出引脚
  Serial.begin(115200); 
  dht.setup(DHT11PIN, DHTesp::DHT11);
  setup_wifi();                                                                    //执行Wifi初始化,下文有具体描述
  client.setServer(mqtt_server, 61613);                              //设定MQTT服务器与使用的端口,1883是默认的MQTT端口
  client.setCallback(callback);                                          //设定回调方式,当ESP8266收到订阅消息时会调用此方法
}

void setup_wifi() {

  delay(10);
  // 板子通电后要启动,稍微等待一下让板子点亮
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);   // 打印主题信息
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]); // 打印主题内容
  }
  Serial.println();

  if ((char)payload[0] == '1') {
    digitalWrite(RELAYPIN, HIGH);   // 亮灯
    state = 1;
  } else {
    digitalWrite(RELAYPIN, LOW);   // 熄灯
    state = 0;
  }
  //发布电源状态消息
  char pub2[20];
  sprintf(pub2, "state:%d",state);
  client.publish(pub_topic_2, pub2);
}

void reconnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect(client_id,mqtt_username,mqtt_password)) {
      Serial.println("connected");
      // 连接成功时订阅主题
      client.subscribe(sub_topic);
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void loop() {
  
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  long now = millis();
  if (now - lastMsg > 2000) {
    char pub1[20];
    TempAndHumidity lastValues = dht.getTempAndHumidity();
    Serial.println("Temperature: " + String(lastValues.temperature,0));
    Serial.println("Humidity: " + String(lastValues.humidity,0));
    lastMsg = now;
    sprintf(pub1, "Temperature:%f#Humidity:%f",lastValues.temperature,lastValues.humidity);
    client.publish(pub_topic_1, pub1);
  }
}

连接开发板并进行数据烧录


image.png

结果展示

image.png

点击电源按钮,继电器会会开关,并把状态返回给客户端
Github地址:https://github.com/FlyMantou/android_mqtt.git
部分资源百度云:链接:https://pan.baidu.com/s/1UmBtOiWXpVtTOVMi8gFhrA
提取码:bgpc
如果这篇文章能够帮到你,请帮我在Github点一个follow,一起学习,一起加油!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容