目录
在文章《【进程间通信】——消息队列基础》中简单介绍了基于POSIX的进程通信,本篇介绍介绍基于另一个使用了POSIX消息队列库mqueue
,同时为了更加实用,使用了thread
库创建子线程,定时查询消息队列的缓冲区,读取数据!
下面给出基于该消息队列的最小demo,分别是sender.cpp
和receiver.cpp
文件,前者发送消息,后者接收消息。
一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,可以先启动sender后启动receiver,receiver也能接收到之前的数据,但是缓冲区满了就不行了。
一、sender.cpp
1.1 源码
//g++ -std=c++11 -o sender sender.cpp -lpthread -lrt
#include <iostream>
#include <chrono>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <thread>
#include <vector>
int main() {
mqd_t mq;
struct mq_attr attr;
attr.mq_flags = 0; // mq_flags = O_NONBLOCK; // 非阻塞
attr.mq_maxmsg = 10;
attr.mq_msgsize = 1024;
attr.mq_curmsgs = 0;
const char* queue_name = "/demo_mqueue";
mq = mq_open(queue_name, O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
std::vector<uint8_t> data = {42, 32, 128, 64};
while (true) {
// 打印发送的数据
std::cout << "Send data: ";
for (const auto& byte : data) {
std::cout << static_cast<unsigned>(byte) << " ";
}
std::cout << std::endl;
if (mq_send(mq, (const char*)data.data(), data.size(), 0) == -1) {
std::cerr << "Error sending message" << std::endl;
}
// data所有数据+1
for (auto& byte : data){
byte++;
}
// 注意如果接收者的频率过低,会填充满缓冲区,导致发送频率就会降低,可以尝试接收1000ms,发送10ms,发送者终端输出频率也会降低
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
mq_close(mq);
mq_unlink(queue_name);
return 0;
}
1.2 编译命令
g++ -std=c++11 -o sender sender.cpp -lpthread -lrt
1.3 代码解释
1.3.1 定义消息的数据类型
attr
变量是一个mq_attr
结构体,用于存储消息队列的属性。
mq_attr
结构体包含以下成员:
-
mq_flags
:消息队列的标志,如是否设置为非阻塞等。 -
mq_maxmsg
:消息队列中允许的最大消息数。 -
mq_msgsize
:每个消息允许的最大字节数。 -
mq_curmsgs
:消息队列中当前的消息数。这是一个输出参数,不需要在mq_open时设置。
在默认情况下,消息队列处于阻塞模式,mq_flags字段默认为0,这意味着消息队列将以阻塞模式工作。因此,不需要执行任何操作来设置阻塞模式。只需确保在创建消息队列时,不要设置O_NONBLOCK标志即可。
若要设置消息队列处于阻塞模式,设置O_NONBLOCK标志即可。
阻塞和非阻塞模式主要在发送和接收消息时表现出不同的行为:
阻塞模式(默认):
- 发送消息:当消息队列已满(达到mq_maxmsg限制)时,如果一个进程尝试向队列发送消息,它将被阻塞,直到队列有足够的空间容纳新消息为止。
- 接收消息:当消息队列为空时,如果一个进程尝试从队列中接收消息,它将被阻塞,直到有消息可用为止。
非阻塞模式(O_NONBLOCK):
- 发送消息:当消息队列已满时,如果一个进程尝试向队列发送消息,它将立即返回一个错误(EAGAIN),而不是等待空间可用。
- 接收消息:当消息队列为空时,如果一个进程尝试从队列中接收消息,它将立即返回一个错误(EAGAIN),而不是等待消息。
1.3.2 mq_open() 函数
mq_open() 函数用于打开或创建一个消息队列。函数原型如下:
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
mq_open:打开一个消息队列。其参数包括队列名称、打开模式、访问权限以及一个指向mq_attr结构体的指针。它返回一个mqd_t类型的描述符,用于后续操作。
参数:
-
name
:消息队列的名称。该名称应以斜线(/)开头。 -
oflag
:打开标志,指定消息队列的访问模式,例如只读(O_RDONLY)、只写(O_WRONLY)或读写(O_RDWR)。还可以与其他标志组合使用,如 O_CREAT(如果不存在,则创建消息队列)和 O_EXCL(与O_CREAT一起使用时,如果消息队列已经存在,则返回错误)。 -
mode
:创建消息队列时的权限。通常表示为八进制,例如0644。 -
attr
:指向mq_attr结构体的指针,用于设置消息队列的属性。如果为NULL,则使用系统默认值。
返回值:成功时返回消息队列描述符(mqd_t类型),失败时返回-1。
1.3.3 mq_send() 函数
mq_send()函数向消息队列发送消息。函数原型:
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
参数包括消息队列描述符、消息内容的指针、消息的大小以及消息的优先级。
参数:
-
mqdes
:消息队列描述符,由mq_open函数返回。 -
msg_ptr
:指向要发送消息的指针。 -
msg_len
:消息的长度(字节数)。 -
msg_prio
:消息的优先级。优先级值越小,优先级越高。 -
返回值
:成功时返回0,失败时返回-1。
1.3.3 mq_close() 函数
mq_close() 函数用于关闭一个消息队列,函数原型:
int mq_close(mqd_t mqdes);
参数:
-
mqdes
:消息队列描述符,由mq_open函数返回。 -
返回值
:成功时返回0,失败时返回-1。
一个进程结束,会自动调用关闭打开着的消息队列。消息队列关闭后,并不会被系统中删除。
1.3.4 mq_unlink() 函数
mq_unlink用于删除一个消息队列。函数原型:
int mq_unlink(const char *name);
参数:
-
name
:要删除的消息队列的名称。 -
返回值
:成功时返回0,失败时返回-1。
消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。
二、receiver.cpp
2.1 源码
// g++ -std=c++11 -o receiver receiver.cpp -lpthread -lrt
#include <iostream>
#include <thread>
#include <chrono>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <cstring>
#include <vector>
#include <cstdint>
void receive_and_print_int() {
const char* queue_name = "/demo_mqueue";
mqd_t mq;
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 1024;
attr.mq_curmsgs = 0;
mq = mq_open(queue_name, O_CREAT | O_RDONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
std::cerr << "Error opening message queue" << std::endl;
}
while (true) {
std::vector<uint8_t> received_data(1024);
ssize_t received_size = mq_receive(mq, (char*)received_data.data(), received_data.size(), NULL);
if (received_size == -1) {
std::cerr << "Error receiving message" << std::endl;
}
received_data.resize(received_size);
std::cout << "Received data: ";
for (const auto& byte : received_data) {
std::cout << static_cast<unsigned>(byte) << " ";
}
std::cout << std::endl;
std::cout << "runing receive_and_print_int thread!" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
mq_close(mq);
mq_unlink(queue_name);
}
int main() {
std::thread sub_thread(receive_and_print_int);
while (true) {
std::cout << "Hello World" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
sub_thread.join();
return 0;
}
2.2 编译命令
g++ -std=c++11 -o receiver receiver.cpp -lpthread -lrt
2.3 代码解释
2.3.1 std::thread()函数
使用std::thread创建一个线程并将一个函数作为参数传递时,线程将立即开始运行该函数。在这个例子中,std::thread sub_thread(receive_and_print_int);创建了一个新线程,并立即开始执行receive_and_print_int函数。
2.3.1 sub_thread.join()函数
sub_thread.join()函数是一个阻塞调用,它会等待sub_thread线程执行完毕后才继续执行主线程。在这个例子中,receive_and_print_int函数包含一个无限循环,所以它将永远不会结束。因此,在这个特定示例中,sub_thread.join();实际上永远不会返回,主线程将一直等待sub_thread线程完成。
通常,在程序结束之前,需要使用join()函数等待所有子线程完成。这样可以确保子线程有足够的时间来完成它们的任务,避免在子线程尚未完成时主线程就结束,导致潜在的问题。
三、注意事项
如果接收者的频率过低,会填充满缓冲区,导致发送频率就会降低,可以尝试接收1000ms,发送10ms,发送者终端输出频率也会降低