【进程间通信】——基于消息队列的多线程定时查询

目录

在文章《【进程间通信】——消息队列基础》中简单介绍了基于POSIX的进程通信,本篇介绍介绍基于另一个使用了POSIX消息队列库mqueue,同时为了更加实用,使用了thread库创建子线程,定时查询消息队列的缓冲区,读取数据!

下面给出基于该消息队列的最小demo,分别是sender.cppreceiver.cpp文件,前者发送消息,后者接收消息。

一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,可以先启动sender后启动receiver,receiver也能接收到之前的数据,但是缓冲区满了就不行了。

一、sender.cpp

1.1 源码

//g++ -std=c++11 -o sender sender.cpp -lpthread -lrt
#include <iostream>
#include <chrono>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <thread>
#include <vector>

int main() {
    mqd_t mq;
    struct mq_attr attr;
    attr.mq_flags = 0; // mq_flags = O_NONBLOCK; // 非阻塞
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = 1024;
    attr.mq_curmsgs = 0;
    const char* queue_name = "/demo_mqueue";

    mq = mq_open(queue_name, O_CREAT | O_WRONLY, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        exit(1);
    }

    std::vector<uint8_t> data = {42, 32, 128, 64};
    while (true) {
        // 打印发送的数据
        std::cout << "Send data: ";
        for (const auto& byte : data) {
            std::cout << static_cast<unsigned>(byte) << " ";
        }
        std::cout << std::endl;

        if (mq_send(mq, (const char*)data.data(), data.size(), 0) == -1) {
            std::cerr << "Error sending message" << std::endl;
        }

        // data所有数据+1
        for (auto& byte : data){
            byte++;
        }

        // 注意如果接收者的频率过低,会填充满缓冲区,导致发送频率就会降低,可以尝试接收1000ms,发送10ms,发送者终端输出频率也会降低
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }

    mq_close(mq);
    mq_unlink(queue_name);

    return 0;
}

1.2 编译命令

g++ -std=c++11 -o sender sender.cpp -lpthread -lrt

1.3 代码解释

1.3.1 定义消息的数据类型

attr变量是一个mq_attr结构体,用于存储消息队列的属性。

mq_attr结构体包含以下成员:

  • mq_flags:消息队列的标志,如是否设置为非阻塞等。
  • mq_maxmsg:消息队列中允许的最大消息数。
  • mq_msgsize:每个消息允许的最大字节数。
  • mq_curmsgs:消息队列中当前的消息数。这是一个输出参数,不需要在mq_open时设置。

在默认情况下,消息队列处于阻塞模式,mq_flags字段默认为0,这意味着消息队列将以阻塞模式工作。因此,不需要执行任何操作来设置阻塞模式。只需确保在创建消息队列时,不要设置O_NONBLOCK标志即可。

若要设置消息队列处于阻塞模式,设置O_NONBLOCK标志即可。

阻塞和非阻塞模式主要在发送和接收消息时表现出不同的行为:

阻塞模式(默认):

  • 发送消息:当消息队列已满(达到mq_maxmsg限制)时,如果一个进程尝试向队列发送消息,它将被阻塞,直到队列有足够的空间容纳新消息为止。
  • 接收消息:当消息队列为空时,如果一个进程尝试从队列中接收消息,它将被阻塞,直到有消息可用为止。

非阻塞模式(O_NONBLOCK):

  • 发送消息:当消息队列已满时,如果一个进程尝试向队列发送消息,它将立即返回一个错误(EAGAIN),而不是等待空间可用。
  • 接收消息:当消息队列为空时,如果一个进程尝试从队列中接收消息,它将立即返回一个错误(EAGAIN),而不是等待消息。

1.3.2 mq_open() 函数

mq_open() 函数用于打开或创建一个消息队列。函数原型如下:
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

mq_open:打开一个消息队列。其参数包括队列名称、打开模式、访问权限以及一个指向mq_attr结构体的指针。它返回一个mqd_t类型的描述符,用于后续操作。

参数:

  • name:消息队列的名称。该名称应以斜线(/)开头。
  • oflag:打开标志,指定消息队列的访问模式,例如只读(O_RDONLY)、只写(O_WRONLY)或读写(O_RDWR)。还可以与其他标志组合使用,如 O_CREAT(如果不存在,则创建消息队列)和 O_EXCL(与O_CREAT一起使用时,如果消息队列已经存在,则返回错误)。
  • mode:创建消息队列时的权限。通常表示为八进制,例如0644。
  • attr:指向mq_attr结构体的指针,用于设置消息队列的属性。如果为NULL,则使用系统默认值。
    返回值:成功时返回消息队列描述符(mqd_t类型),失败时返回-1。

1.3.3 mq_send() 函数

mq_send()函数向消息队列发送消息。函数原型:
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);

参数包括消息队列描述符、消息内容的指针、消息的大小以及消息的优先级。

参数:

  • mqdes:消息队列描述符,由mq_open函数返回。
  • msg_ptr:指向要发送消息的指针。
  • msg_len:消息的长度(字节数)。
  • msg_prio:消息的优先级。优先级值越小,优先级越高。
  • 返回值:成功时返回0,失败时返回-1。

1.3.3 mq_close() 函数

mq_close() 函数用于关闭一个消息队列,函数原型:

int mq_close(mqd_t mqdes);

参数:

  • mqdes:消息队列描述符,由mq_open函数返回。
  • 返回值:成功时返回0,失败时返回-1。

一个进程结束,会自动调用关闭打开着的消息队列。消息队列关闭后,并不会被系统中删除。

1.3.4 mq_unlink() 函数

mq_unlink用于删除一个消息队列。函数原型:
int mq_unlink(const char *name);

参数:

  • name:要删除的消息队列的名称。
  • 返回值:成功时返回0,失败时返回-1。

消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。

二、receiver.cpp

2.1 源码

// g++ -std=c++11 -o receiver receiver.cpp -lpthread -lrt
#include <iostream>
#include <thread>
#include <chrono>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <cstring>
#include <vector>
#include <cstdint>

void receive_and_print_int() {
    const char* queue_name = "/demo_mqueue";
    mqd_t mq;
    struct mq_attr attr;

    attr.mq_flags = 0;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = 1024;
    attr.mq_curmsgs = 0;

    mq = mq_open(queue_name, O_CREAT | O_RDONLY, 0644, &attr);
    if (mq == (mqd_t)-1) {
        std::cerr << "Error opening message queue" << std::endl;
    }

    while (true) {
        std::vector<uint8_t> received_data(1024);
        ssize_t received_size = mq_receive(mq, (char*)received_data.data(), received_data.size(), NULL);
        if (received_size == -1) {
            std::cerr << "Error receiving message" << std::endl;
        }
        received_data.resize(received_size);

        std::cout << "Received data: ";
        for (const auto& byte : received_data) {
            std::cout << static_cast<unsigned>(byte) << " ";
        }
        std::cout << std::endl;

        std::cout << "runing receive_and_print_int thread!" << std::endl;

        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
    mq_close(mq);
    mq_unlink(queue_name);
}

int main() {
    std::thread sub_thread(receive_and_print_int);
    while (true) {
        std::cout << "Hello World" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }

    sub_thread.join();

    return 0;
}

2.2 编译命令

g++ -std=c++11 -o receiver receiver.cpp -lpthread -lrt

2.3 代码解释

2.3.1 std::thread()函数

使用std::thread创建一个线程并将一个函数作为参数传递时,线程将立即开始运行该函数。在这个例子中,std::thread sub_thread(receive_and_print_int);创建了一个新线程,并立即开始执行receive_and_print_int函数。

2.3.1 sub_thread.join()函数

sub_thread.join()函数是一个阻塞调用,它会等待sub_thread线程执行完毕后才继续执行主线程。在这个例子中,receive_and_print_int函数包含一个无限循环,所以它将永远不会结束。因此,在这个特定示例中,sub_thread.join();实际上永远不会返回,主线程将一直等待sub_thread线程完成。

通常,在程序结束之前,需要使用join()函数等待所有子线程完成。这样可以确保子线程有足够的时间来完成它们的任务,避免在子线程尚未完成时主线程就结束,导致潜在的问题。

三、注意事项

如果接收者的频率过低,会填充满缓冲区,导致发送频率就会降低,可以尝试接收1000ms,发送10ms,发送者终端输出频率也会降低

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容