Paho -物联网 MQTT C Cient的实现和详解

作者:阿进的写字台

出处:https://www.cnblogs.com/homejim/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

概述

  在文章Paho - MQTT C Cient的实现中,我介绍了如何使用Paho开源项目创建MQTTClient_pulish客户端。但只是简单的介绍了使用方法,而且客户端的结果与之前介绍的并不吻合,今天我就结合新的例子,给大家讲解一下Paho使用MQTT客户端的主要过程。如同前面介绍的,MQTT客户端分为同步客户端和异步客户端。今天主要讲解的是同步客户端,结构还是如同步客户端中介绍的:

1.创建一个客户端对象;

2.设置连接MQTT服务器的选项;

3.如果多线程(异步模式)操作被使用则设置回调函数(详见 Asynchronous >vs synchronous client

applications);

4.订阅客户端需要接收的任意话题;

5.重复以下操作直到结束:

    a.发布客户端需要的任意信息;

    b.处理所有接收到的信息;

6.断开客户端连接;

7.释放客户端使用的所有内存。

实现

好,直接上代码,MQTT简单的同步客户端。

#include

<pthread.h>

#include

<stdio.h>

#include

<stdlib.h>

#include

<string.h>

#include

"MQTTClient.h"

#if

!defined(WIN32)

#include

<unistd.h>

#else

#include

<windows.h>

#endif


#define

NUM_THREADS 2

#defineADDRESS     "tcp://localhost:1883"//更改此处地址

#defineCLIENTID    "aaabbbccc_pub"//更改此处客户端ID

#defineSUB_CLIENTID    "aaabbbccc_sub"//更改此处客户端ID

#defineTOPIC       "topic01"  //更改发送的话题

#definePAYLOAD     "Hello Man, Can you seeme ?!"//

#defineQOS         1

#defineTIMEOUT     10000L

#defineUSERNAME    "test_user"

#definePASSWORD    "jim777"

#defineDISCONNECT  "out"


intCONNECT = 1;

volatileMQTTClient_deliveryTokendeliveredtoken;


void delivered(void*context, MQTTClient_deliveryToken dt)

{

    printf("Message

with token value %d delivery confirmed\n", dt);

    deliveredtoken = dt;

}


int msgarrvd(void *context, char *topicName, inttopicLen, MQTTClient_message *message)

{

    inti;

    char* payloadptr;


    printf("Message

arrived\n");

    printf("     topic: %s\n", topicName);

    printf("   message: ");


    payloadptr = message->payload;

    if(strcmp(payloadptr, DISCONNECT) == 0){

        printf(" \n out!!");

        CONNECT = 0;

    }


    for(i=0; ipayloadlen; i++)

    {

        putchar(*payloadptr++);

    }

    printf("\n");


    MQTTClient_freeMessage(&message);

    MQTTClient_free(topicName);

    return1;

}


void connlost(void *context, char*cause)

{

    printf("\nConnection

lost\n");

    printf("     cause: %s\n", cause);

}


void *subClient(void*threadid){

   longtid;

   tid = (long)threadid;

   printf("Hello

World! It's me, thread #%ld!\n", tid);


    MQTTClient client;

    MQTTClient_connectOptions conn_opts =MQTTClient_connectOptions_initializer;

    intrc;

    intch;


    MQTTClient_create(&client, ADDRESS,SUB_CLIENTID,

        MQTTCLIENT_PERSISTENCE_NONE,NULL);

    conn_opts.keepAliveInterval = 20;

    conn_opts.cleansession = 1;

    conn_opts.username = USERNAME;

    conn_opts.password = PASSWORD;


    MQTTClient_setCallbacks(client,NULL, connlost, msgarrvd,delivered);


    if((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)

    {

        printf("Failed to connect, return code

%d\n",rc);

        exit(EXIT_FAILURE);

    }

    printf("Subscribing

to topic %s\nfor client %s using QoS%d\n\n"

           "Press Q<Enter> to

quit\n\n",TOPIC, CLIENTID, QOS);

    MQTTClient_subscribe(client, TOPIC, QOS);


    do

    {

        ch = getchar();

    }while(ch!='Q' && ch != 'q');


    MQTTClient_unsubscribe(client, TOPIC);

    MQTTClient_disconnect(client, 10000);

    MQTTClient_destroy(&client);


   pthread_exit(NULL);

}

void *pubClient(void*threadid){

   longtid;

   tid = (long)threadid;

   intcount = 0;

   printf("Hello

World! It's me, thread #%ld!\n", tid);

//声明一个MQTTClient

    MQTTClient client;

    //初始化MQTT Client选项

    MQTTClient_connectOptions conn_opts =MQTTClient_connectOptions_initializer;

    //#define MQTTClient_message_initializer { {'M', 'Q',

'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }

    MQTTClient_message pubmsg =MQTTClient_message_initializer;

    //声明消息token

    MQTTClient_deliveryToken token;

    intrc;

    //使用参数创建一个client,并将其赋值给之前声明的client

    MQTTClient_create(&client, ADDRESS, CLIENTID,

        MQTTCLIENT_PERSISTENCE_NONE,NULL);

    conn_opts.keepAliveInterval = 20;

    conn_opts.cleansession = 1;

    conn_opts.username = USERNAME;

    conn_opts.password = PASSWORD;

     //使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS

    if((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)

    {

        printf("Failed to connect, return code

%d\n",rc);

        exit(EXIT_FAILURE);

    }

    pubmsg.payload = PAYLOAD;

    pubmsg.payloadlen =strlen(PAYLOAD);

    pubmsg.qos = QOS;

    pubmsg.retained = 0;

    while(CONNECT){

    MQTTClient_publishMessage(client, TOPIC,&pubmsg, &token);

    printf("Waiting

for up to %d seconds for publication of %s\n"

            "on topic %s for client with

ClientID: %s\n",

            (int)(TIMEOUT/1000), PAYLOAD, TOPIC,CLIENTID);

    rc = MQTTClient_waitForCompletion(client,token, TIMEOUT);

    printf("Message

with delivery token %d delivered\n", token);

    usleep(3000000L);

    }



    MQTTClient_disconnect(client, 10000);

    MQTTClient_destroy(&client);

}

int main(int argc, char* argv[])

{

    pthread_tthreads[NUM_THREADS];

    longt;

    pthread_create(&threads[0],NULL, subClient, (void*)0);

    pthread_create(&threads[1],NULL, pubClient, (void*)1);

    pthread_exit(NULL);

}

  在代码中,我创建了两个线程,分别用来处理订阅客户端和发布客户端。

整体详解

接下来我讲解一下这个简单的客户端,其中,大体的流程如下:


大体的流程如图所示,在客户端启动之后,会启动线程,创建一个订阅客户端,它会监听消息的到达,在消息到达之后会触发相应的回调函数以对消息进行处理;后在启动一个线程,创建一个发送客户端,用来发送消息的,每次发送消息之前会判断是否要掉线,如CONNECT=0则会掉线,否则发送消息给topic01。

订阅客户端详解

以下函数完成的是订阅的功能。

void *subClient(void *threadid)

过程大概如下:

  第一步:声明客户端,并通过函数给其赋值;

MQTTClient client;

MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

  第二步:设置连接MQTT服务器的选项;

MQTTClient_connectOptions conn_opts =

MQTTClient_connectOptions_initializer;

  第三步:设置回调函数;

MQTTClient_setCallbacks(client, NULL, connlost,msgarrvd, delivered);

//相应的回调函数connlost,msgarrvd,delivered我的代码中都有

  第四步:使用客户端和连接选项连接服务器;

MQTTClient_connect(client, &conn_opts))

  第五步订阅话题;

MQTTClient_subscribe(client, TOPIC, QOS);

  第六步一直等待,知道输入'Q' 或'q';

    do

    {

        ch = getchar();

    }while(ch!='Q' && ch != 'q');

  第六步一直等待,直到输入'Q' 或'q';

    do

    {

        ch = getchar();

    }while(ch!='Q' && ch != 'q');

  第七步取消订阅;

MQTTClient_unsubscribe(client, TOPIC);

  第八步.断开客户端连接;

 MQTTClient_disconnect(client, 10000);

  第九步.释放客户端使用的所有内存;

MQTTClient_destroy(&client);

  至此,订阅客户端就结束了。一般订阅客户端的大体结构都是这样。不同的是回调函数的个性化上。

发送客户端详解

  以下函数完成的是发送的功能。

void *pubClient(void *threadid)

过程大概如下:

  第一步:声明客户端,并通过函数给其赋值;

MQTTClientclient;

MQTTClient_create(&client,

ADDRESS, CLIENTID,

MQTTCLIENT_PERSISTENCE_NONE, NULL);

  第二步:设置连接MQTT服务器的选项;

MQTTClient_connectOptions conn_opts =

MQTTClient_connectOptions_initializer;

  第三步:使用客户端和连接选项连接服务器;

MQTTClient_connect(client, &conn_opts)

  第四步设置发送消息的属性;

    pubmsg.payload = PAYLOAD;

    pubmsg.payloadlen =strlen(PAYLOAD);

    pubmsg.qos = QOS;

    pubmsg.retained = 0;

  第五步循环发送消息;

   MQTTClient_publishMessage(client, TOPIC, &pubmsg,

&token);

  第六步一直等待,当CONNECT=0时退出该客户端;

  第七步.断开客户端连接;

    MQTTClient_disconnect(client, 10000);

  第八步.释放客户端使用的所有内存;

 MQTTClient_destroy(&client);

  至此,发送客户端就结束了。一般的发送客户端大体结构也如此,但异步客户端可能有些许不同,无非就是设计回调函数,然后在连接,断开连接等时可以使用回调函数做一些操作而已,具体的可以自己研究。

  为了让大家能够更深入了解,我把自己学到的一些函数和结构体大致在下面讲解了一下。

相关结构体

MQTTClient

定义:typedef void* MQTTClient;

含义:代表MQTT客户端的句柄。成功调用MQTTClient_create()后,可以得到有效的客户端句柄。

MQTTClient_connectOptions

定义:

typedef struct

{

char struct_id[4];//结构体的识别序列,必须为MQTC

int struct_version;//结构体版本

/**

在0,1,2,3,4,5中取值:

0-表示没有SSL选项且没有serverURIs;

1-表示没有serverURIs;

2-表示没有MQTTVersion

3-表示没有返回值;

4-表示没有二进制密码选项

*/

intkeepAliveInterval;

/**

在这段时间内没有数据相关的消息时,客户端发送一个非常小的MQTT“ping”消息,服务器将会确认这个消息

*/

intcleansession;

/**

当cleansession为true时,会话状态信息在连接和断开连接时被丢弃。 将cleansession设置为false将保留会话状态信息

*/

intreliable;

/*

将该值设置为true意味着必须完成发布的消息(已收到确认),才能发送另一个消息

*/

MQTTClient_willOptions*will;

/*

如果程序不使用最后的意愿和遗嘱功能,请将此指针设置为NULL。

*/

const char* username;//用户名

const char* password;//密码

int connectTimeout;//允许尝试连接的过时时间

int retryInterval;//尝试重连的时间

MQTTClient_SSLOptions*ssl;

/*

如果程序不使用最后的ssl,请将此指针设置为NULL。

*/

intserverURIcount;


char* const* serverURIs;

/*

连接服务器的url,以protocol:// host:port为格式

*/

intMQTTVersion;

/*

MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4)

*/

struct

{

const char* serverURI;  

intMQTTVersion;    

intsessionPresent; 

}returned;

  struct{

  intlen;           

const void* data; 

}binarypwd;

}MQTTClient_connectOptions;

含义:用来设置MQTTClient的连接选项的结构体。

MQTTClient_message

定义:

typedef struct

{

    char struct_id[4];//结构体的识别序列,必须为MQTM

    int struct_version;//结构体的版本,必须为0

    int payloadlen;//MQTT信息的长度

    void* payload;//指向消息负载的指针

    int qos;//服务质量

    int retained;//保留标志

    int dup;dup//标志指示这个消息是否是重复的。 只有在收到QoS1消息时才有意义。如果为true,则客户端应用程序应采取适当的措施来处理重复的消息。

    int msgid;//消息标识符通常保留供MQTT客户端和服务器内部使用。

}

MQTTClient_message;

含义:代表MQTT信息的结构体。

相关函数详解

MQTTClient_create

定义:

DLLExportint MQTTClient_create(   

        MQTTClient *    handle,

        const char*   serverURI,

        const char*   clientId,

        int    persistence_type,

        void*     persistence_context

    )

作用:该函数创建了一个用于连接到特定服务器,使用特定持久存储的MQTT客户端。

参数含义

handle指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充

serverURI以空结尾的字符串,其指定客户端将连接到的服务器。其格式为protocol://host:port。现在的(protocol)协议必须是tcp或ssl,而host可以指定为IP地址或域名。例如, 要使用默认 MQTT 端口连接到本地计算机上运行的服务器, 请指定为 tcp://localhost:1883。

clientId客户端标识符(clientId)是一个以空结尾的 UTF-8 编码字符串,客户端连接到服务器时将它传递过去。

persistence_type客户端所使用的持久类型。MQTTCLIENT_PERSISTENCE_NONE-使用内存持久化。如果客户端运行的设备或系统出故障或关闭, 则任何正在运行的消息的当前状态都将丢失, 甚至在 QoS1 和 QoS2 中也可能无法传递某些消息; MQTTCLIENT_PERSISTENCE_DEFAULT-使用默认的持久化机制(文件系统)。正在运行消息的状态被保存在持久存储中,以便在意外出现时对消息的丢失提供一些保护; MQTTCLIENT_PERSISTENCE_USER-使用程序指定的持久化实现。使用这种类型,应用程序可对持久化机制进行控制,应用程序必须实现MQTTClient_persistence 接口。

persistence_context如果应用程序使用的是MQTTCLIENT_PERSISTENCE_NONE持久化,该参数不使用,而且值应该设置为NULL。对于MQTTCLIENT_PERSISTENCE_DEFAULT持久化,应该设置持久化目录的位置(如果设置为NULL,则使用工作目录作为持久化目录)。使用MQTTCLIENT_PERSISTENCE_USER持久化,则将此参数指向有效的MQTTClient_persistence结构。

MQTTClient_setCallbacks

定义:

DLLExportintMQTTClient_setCallbacks   (  

        MQTTClient      handle,

        void*     context,

        MQTTClient_connectionLost *     cl,

        MQTTClient_messageArrived *     ma,

        MQTTClient_deliveryComplete *   dc

    )  

作用:该函数为特定的客户端创建回调函数。如果您的客户端应用程序不使用特定的回调函数,请将相关参数设置为NULL。 调用MQTTClient_setCallbacks()使客户端进入多线程模式。 任何必要的消息确认和状态通信都在后台处理,而不需要客户端应用程序的任何干预。

注意:在调用该函数时,MQTT客户端必须断开连接。(即先要调用该函数在连接客户端)。

|

参数 | 含义|

| ---|-------------|

| handle |

指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充|

| context|

指向任何应用程序特定上下文的指针。 上下文指针被传递给每个回调函数,以提供对回调中的上下文信息的访问。|

|cl|

指向MQTTClient_connectionLost()回调函数的指针。 如果您的应用程序不处理断开连接,您可以将其设置为NULL。|

|ma|

指向MQTTClient_messageArrived()回调函数的指针。 当您调用MQTTClient_setCallbacks()时,必须指定此回调函数。|

|dc|

指向MQTTClient_deliveryComplete()回调函数的指针。 如果您的应用程序同步发布,或者您不想检查是否成功发送,则可以将其设置为NULL。|

MQTTClient_connect

定义:

DLLExportintMQTTClient_connect    (  

        MQTTClient      handle,

        MQTTClient_connectOptions *     options

    )      

作用:此函数尝试使用指定的选项将先前创建的客户端连接到MQTT服务器。

参数含义

handle指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充

options指向有效的MQTTClient_connectOptions结构的指针。

返回值

0连接成功

1拒绝连接:不可接受的协议版本。

2拒绝连接:标识符被拒绝。

3拒绝连接:服务器不可用。

4拒绝连接:用户名或密码错误。

5拒绝连接:未经授权。

6保留给未来用。

MQTTClient_subscribe

定义:

DLLExportint MQTTClient_subscribe  (  

        MQTTClient      handle,

        const char*   topic,

        int    qos

    )  

作用:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。 此函数还指定服务质量。

参数含义

handle指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充

topic订阅的主题,可使用通配符。

qos订阅的请求服务质量

MQTTClient_publishMessage

定义:

DLLExportint MQTTClient_publishMessage     (  

        MQTTClient      handle,

        const char*   topicName,

        MQTTClient_message *    msg,

        MQTTClient_deliveryToken *      dt

    )

作用:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。 此函数还指定服务质量。

参数含义

handle指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充

topicName与信息相关的主题。

msg指向有效的 MQTTClient_message 结构的指针,

  其中包含要发布消息的有效负载和属性

dt指向MQTTClient_deliveryToken的指针。当函数成功返回时,dt会被赋值为代表消息的token。如果程序中没有使用传递token,将其设置为NULL。

MQTTClient_waitForCompletion

定义:

DLLExportint MQTTClient_waitForCompletion  (  

        MQTTClient      handle,

        MQTTClient_deliveryToken    dt,

        unsigned long  timeout

    )  

作用:客户端应用程序调用此函数来将主线程的执行与消息的完成发布同步。 被调用时,MQTTClient_waitForCompletion()阻塞执行,直到消息成功传递或已超过指定的时间。

参数含义

handle指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充

dt代表消息的MQTTClient_deliveryToken用来检测是否成功传递。传递token由发布函数MQTTClient_publish () 和 MQTTClient_publishMessage ()所产生。

timeout等待的最大毫秒数。

返回值:消息成功传递则返回MQTTCLIENT_SUCCESS(0) ,如果时间已过期或检测token时出问题,则返回错误码。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容