从 0 开始学习 Linux 系列之「23.消息队列 Msg Queue」

消息队列

版权声明:本文为 cdeveloper 原创文章,可以随意转载,但必须在明确位置注明出处!

消息队列 Msg Queue

如果你在 Windows 上开发过应用程序,想必你应该听过消息队列这个概念。在 Windows 中每个程序都有一个消息队列,整个程序在一个 loop 中等待从消息队列中取消息并执行,所以称 Windows 上的程序为事件驱动型。同样在 Linux 开发中也有消息队列这个概念,不过 Linux 中的消息队列是用来进行 IPC 的,本质上跟共享内存一样也是内存维护的一片内存区域。这篇文章带你学习消息队列的相关操作和内核机制。

系统中的消息队列

可以使用 ipcs -q 查看系统当前使用的消息队列(以下简称 MQ):

ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages

我的系统当前没有任何 MQ,后面我们会用程序创建一个 MQ,然后再用这个命令查看。

消息队列的原理

MQ 传递的是消息,消息即是我们需要在进程间传递的数据。MQ 采用链表来实现消息队列,该链表是由系统内核维护,系统中可能有很多的 MQ,每个 MQ 用消息队列描述符(消息队列 ID:qid)来区分,qid 是唯一的,用来区分不同的 MQ。在进行进程间通信时,一个进程将消息加到 MQ 尾端,另一个进程从消息队列中取消息(不一定以先进先出来取消息,也可以按照消息类型字段取消息),这样就实现了进程间的通信。如下 MQ 的模型:

消息队列

进程 A 向内核维护的消息队列中发消息,进程 B 从消息队列中取消息,从而实现了 A 和 B 的进程间通信。了解了原理,来看看如何使用 MQ。

使用消息队列

MQ 的 API 操作与共享内存几乎是相同的,分为下面 4 个步骤:

  1. 创建和访问 MQ
  2. 发送消息
  3. 接受消息
  4. 删除 MQ

来分别学习这些函数。

1. 创建:msgget

使用 msgget 可以创建一个消息队列,需要指定创建的 key 和标志,key 与返回的 qid 有关系。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * key:用来指定返回 MQ 的 ID
 * msgflg:创建的标志,例如 IPC_CREAT
 * return:成功返回队列 ID,失败返回 -1, 并设置 erron
 */
int msgget(key_t key, int msgflg);

2. 发送:msgsnd

使用 msgsnd 来发送一个消息,必须要有写消息队列的权限

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * msgid:消息队列 ID
 * msgp:指向 msgbuf 的指针,用来指定发送的消息
 * msgsz:要发送消息的长度
 * msgflg:创建标记,如果指定 IPC_NOWAIT,失败会立刻返回
 * return:成功返回 0, 失败返回 -1, 并设置 erron
 */
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

3. 发送:msgrcv

使用 msgrcv 来从 msgqid 标识的 MQ 中读取一个消息放到 msgp 指定的内存中,必须要有读消息队列的权限

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * msgid:消息队列 ID
 * msgp:指向 msgbuf 的指针,用来接收消息
 * msgsz:要接收消息的长度
 * msgtyp:接收消息的方式
 * msgflg:创建标记,如果指定 IPC_NOWAIT,获取失败会立刻返回
 * return:成功返回实际读取消息的字节数, 失败返回 -1, 并设置 erron
 */
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

注意:参数 msgsz 指定由 msgp 参数指向的结构的成员 mtext 的最大大小(以字节为单位),msgtyp 也有 3 种方式:

  1. msgtyp = 0:读取队列中的第一条消息
  2. msgtyp > 0:读取队列中类型为 msgtyp 的第一条消息,除非在 msgflg 中指定了 MSG_EXCEPT,否则将读取类型不等于 msgtyp 的队列中的第一条消息。
  3. msgtyp < 0:读取队列中最小类型小于或等于 msgtyp 绝对值的第一条消息

4. 删除及控制:msgctl

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/*
 * msqid:消息队列 ID
 * cmd:控制命令,例如 IPC_RMID 删除命令
 * buf:存储消息队列的相关信息的 buf
 * return:成功根据不同的 cmd 有不同的返回值,失败返回 -1, 并设置 erron
 */
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

例子:使用消息队列来进程间通信

我们也写两个程序来用 MQ 来进程间通信,一个进程 A 写入字符串数据到消息队列,另一个进程 B 从队列中取出数据。

write_msg.c

// write_msg.c

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

struct msgbuf {
    long mtype;
    char mtext[255];
};

int main() {
    // 1. 创建一个消息队列,用 key = 123 来唯一表示这个队列    
    int msg_id = msgget(123, IPC_CREAT | 0666);
    if (msg_id != -1) {
        
        // 2. 初始化要发生的消息
        struct msgbuf mybuf;
        mybuf.mtype = 1;
        strcpy(mybuf.mtext, "I'm send process.\n");
        
        // 3. 发送消息
        if (msgsnd(msg_id, &mybuf, sizeof(mybuf.mtext), 0)) 
            printf("success\n");            
        else
            perror("msgsnd:");
    } else {
        perror("msgget:");
    }

    return 0;
}

read_msg.c

这是读取进程:

// read_msg.c
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

struct msgbuf {
    long mtype;
    char mtext[255];
};

int main() {
    // 1. 获取消息队列
    int msg_id = msgget(123, IPC_CREAT | 0666);
    if (msg_id != -1) {
        struct msgbuf mybuf;
        // 2. 接收第一条消息,存到 mybuf 中
        if (msgrcv(msg_id, &mybuf, sizeof(mybuf.mtext), 0, IPC_NOWAIT) != -1) { 
            printf("read success: %s\n", mybuf.mtext);
            // 3. 接收完就删除这个消息队列
            if (msgctl(msg_id, IPC_RMID, 0) != -1)
                printf("delete msg success\n");
        } else {
            perror("msgsnd:");
        }

    } else {
        perror("msgget:");
    }

    return 0;
}

编译:

gcc write_msg.c -o write_msg
gcc read_msg.c -o read_msg

运行:

./write_msg

msgsnd:: success

消息队列创建成功,并且消息也发送了,我们再次用 ipcs -q 看看存不存在这个队列:

ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages
0x0000007b 32768      orange     666        255          1       

成功输出了我们创建的消息队列,并且大小等于 255 B,里面有一条我们刚才发送的消息,来读取这个消息:

./read_msg

read success: I'm send process.

delete msg success

消息接收成功,并且也删除了消息队列,看看有没有删除成功:

ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages

删除成功!一样,来看看消息队列在内核中的实现。

消息队列在内核中的实现

1. struct msg_queue

消息队列在内核中用下面的数据结构表示

// Linux 3.4: _msg_sm.h
struct msg_queue {
    struct list_head list_elem;
    struct msg_mgr *msg_mgr;
    u32 max_msgs;       /* Node message depth */
    u32 msgq_id;        /* Node environment pointer */
    struct list_head msg_free_list; /* Free MsgFrames ready to be filled */
    
    /* Filled MsgFramess waiting to be read */
    struct list_head msg_used_list;
    void *arg;      /* Handle passed to mgr on_exit callback */
    struct sync_object *sync_event; /* Signalled when message is ready */
    struct sync_object *sync_done;  /* For synchronizing cleanup */
    struct sync_object *sync_done_ack;  /* For synchronizing cleanup */
    struct ntfy_object *ntfy_obj;   /* For notification of message ready */
    bool done;      /* TRUE <==> deleting the object */
    u32 io_msg_pend;    /* Number of pending MSG_get/put calls */
};

可以看到消息队列实际上就是一个链表。

2. 分配获取消息队列

首先上层应用通过调用 msgget 来进行系统调用,然后陷入内核:

int msgget(key, msgflg)
    key_t key;
    int msgflg;
{
    return INLINE_SYSCALL (ipc, 5, IPCOP_msgget, key, msgflg, 0, NULL);
}

消息队列的内核实现机制和共享内存几乎相同:在内核开辟一片内存空间存放消息队列,不同的进程使用这个消息队列(内存空间)来通信,与共享内存使用相同的一组回调函数 ipc_ops

msgget 的基本的调用过程如下:

msgget

比较重要的是最后一步回调 newque 函数,这个函数在内核中分配了一个消息队列 msg:

newque

3. 发送消息

上层的 msgsnd 系统调用最后会调用到内核的 do_msgsnd 函数,过程如下:

sendmsg

具体的过程这里就不分析了,主要就是在链表中添加一项。

4. 接受消息

上层的 msgrcv 系统调用最后会调用到内核的 do_msgrcv 函数,过程如下:

rcvmsg

同样接收消息会从链表中删除一项,具体的过程可以根据这个路线仔细分析。

结语

本次我们学习了跟共享内存类似的 IPC 方式:消息队列 Msg Queue,它的使用 API 和内核实现原理跟共享内存是差不多,建议你对照两者进行学习,以此来更好的理解。

感谢你的阅读,我们下次再见 :)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容