@(linux 编程)
一、 消息传递
pipe
管道一般为有亲缘关系进程提供单路数据流, 通过pipe(int fd[2])
创建, 返回两个文件描述符, fd[0] 用于读,fd[1]用于写。 通过 read 和 write 函数进行 操作。
父进程创建管道后 fork 子进程, 父子共享该管道的描述符(使用同一个管道)
之后双方各关闭一个描述符,实现单向通信,但需要实现双向时,可通过两个通道实现。
如下实现示例 :
父进程创建管道,创建子进程, 父关闭写端,子关闭读端,子写父读。
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <wait.h>
#define MAXLINE 80
int main(void)
{
int n;
int fd[2];
pid_t pid;
char line[MAXLINE];
if (pipe(fd) < 0) {
perror("pipe\n");
exit(1);
}
if ((pid = fork()) < 0) {
perror("fork\n");
exit(1);
}
if (pid > 0) {
close(fd[0]);//read fd
write(fd[1], "Hello world\n", 12);
wait(NULL);
}
else {
close(fd[1]); // write fd
n = read(fd[0], line, MAXLINE);
write(STDOUT_FILENO, line, n);
}
return 0;
}
以下提供一个开源例子, Webbench
是一个在linux下使用的非常简单的网站压测工具,其中使用pipe进行通信。
FIFO (有名管道)
基本使用
不考虑描述符传递,管道无名所以只能用于有亲缘关系的进程间通信。
FIFO 提供单向先进先出的数据流通道,每个 FIFO 都有一个路径名与之关联, 从而允许无亲缘进程之间进行通信。
使用FIFO 前需要通过mkfifo
创建, 如果存在可能会报错,可以通过判断错误号errno(==EEXIST)
选择忽略,创建FIFO后,就可以像读写文件一样进行操作
需要注意的是,调用open()打开命名管道的进程可能被阻塞
- 如果用读写方式(O_RDWR)打开,则不会导致阻塞;
- 如果以只读(O_RDONLY)方式打开,则调用 open() 函数的进程会被阻塞直到有写方打开管道
- 如果以写方式(O_WRONLY)打开,也会阻塞直到有读方打开管道
注意以下例子, 两个进程打开管道顺序是相方的, 否则就会导致死锁, 永远阻塞。
linux 默认read/write操作是阻塞的, 可以在打开的时候设置O_NONBLOCK
为非阻塞(或者之后使用 fcntl 函数进行设置)。
如果所有写方关闭了管道, 读方 read 函数返回0。
提供一个测试例子
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
// 设置fifo路径, 使得两个进程都能操作
#define READ_PATH "./child_write_fifo"
#define WRITE_PATH "./child_read_fifo"
// 设置fifo创建的权限644
#define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
int main(void)
{
pid_t pid;
// 新建FIFO, 如果存在,忽略错误
if ((mkfifo(READ_PATH, FILE_MODE) < 0) && (errno != EEXIST)) {
printf("Can't create %s", READ_PATH);
}
if ((mkfifo(WRITE_PATH, FILE_MODE) < 0) && (errno != EEXIST)) {
printf("Can't create %s", WRITE_PATH);
unlink(READ_PATH);
}
pid = fork();
if (pid < 0) {
printf("Fork failed");
exit(1);
}
if (pid == 0) {
printf("Child running\n");
int status = 0;
// 打开读后, 阻塞等待另一个进程打开写
int pip_read = open(WRITE_PATH, O_RDONLY);
// 打开写后, 同样需要阻塞直到另一个进程打开读
int pip_write = open(READ_PATH, O_WRONLY);
if (pip_write != -1)
write(pip_write, "Child write\n", 12);
// block and wait
printf("Child read after writed\n");
if (pip_read != -1)
read(pip_read, &status, 1);
printf("Child get status from Parent %d\n", status);
close(pip_read);
close(pip_write);
exit(1);
} else {
printf("Parent running\n");
int pip_write = open(WRITE_PATH, O_WRONLY);
int pip_read = open(READ_PATH, O_RDONLY);
char srt[128];
// block and wait
if (pip_read != -1)
read(pip_read, srt, 12);
printf("Parent get : %s", srt);
int status = 20;
if (pip_write != -1)
write(pip_write, &status, 1);
waitpid(pid, &status, 0);
printf("\nChild %d quit %d\n", pid, status);
printf("The return code is %d\n", WEXITSTATUS(status));
close(pip_read);
close(pip_write);
// if no one open fifo, sys will delete it
unlink(READ_PATH);
unlink(WRITE_PATH);
exit(0);
}
}
使用FIFO后, 通过调用unlink
删除管道。
内核为管道维护了一个访问计数, 统计打开文件描述符的个数, 调用了unlink 函数后, 如果计数不为0, 不会直接删除,会等到最后一个 close 调用使得计数为0, 才调用删除 FIFO, 如果没有调用 unlink, 即使计数为0, 也不会删除FIFO。
FIFO write 的原子性
假设一个进程打开读, 有两个进程打开同一管道同时尝试写入数据(小于PIPE_BUF 1024 到5120), FIFO 保证两次写入的完整性, 不会出现乱序。
当管道最后一个打开的文件描述符被关闭,其中的数据被丢弃!!!
Posix 和 System V 消息队列差别
- Posix 读总是返回最高优先级最早消息, 而System V可以指定任意优先级消息。
- 往一个空队列放入消息时, Posix 允许产生一个信号或者启动一个线程(异步通知)
- 队列中每个消息属性
- 一个无符号整数优先级(Posix) 或一个长整数类型(System V, 不能为0)
- 消息的数据部分长度(可以为0)
- 数据本身
优先选择使用 Posix 消息队列。
Posix 消息队列
消息队列可以认为是一个消息链表,写权限进程放入消息,读权限进程取走消息,不同前面的管道,消息队列写入前不需要有进程等待读取,消息队列是随内核持续性的。
Posix 消息队列接收返回最高优先级最早的消息。我电脑ubuntu,默认消息队列 是最大值 10。
基本使用
创建,打开,关闭和删除消息队列的 API 如下,
#include <mqueue.h>
// 打开消息队列,成功放回队列描述符
// 参数 name "/name", 便于移植
mqd_t mq_open(const char *name, int oflag);
// 创建打开消息队列
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);
// 关闭消息队列, 引用计数减1
int mq_close(mqd_t mqdes);
// 删除消息队列(所有打开都关闭时)
int mq_unlink(const char *name);
mq_open
打开已经建立的消息队列,如果消息队列不存在,需要设置 mode
(读取权限)和 attr
(消息队列属性,传入NULL使用默认), 自定义 attr
,只能设置消息队列大小和消息长度,且不能超过默认值。
oflag
中如果有O_CREAT
, 消息队列不存在时会建立, 如果同时有O_EXCL
并且队列已经存在,则会报错。
其他具体参数使用可以通过 man 查询, 并且 man 中说明编译链接时, 需要加入参数 -lrt
man mq_overview
涉及如何查看系统系消息队列。
消息队列属性获取与设置接口, mqdes
是打开消息队列返回的描述符
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
其中结构体定义在 mqueue.h
文件中, 如下所示
struct mq_attr {
long mq_flags; /* message queue flags */
long mq_maxmsg; /* maximum number of messages */
long mq_msgsize; /* maximum message size */
long mq_curmsgs; /* number of messages currently queued */
long __reserved[4]; /* ignored for input, zeroed for output */
};
消息队列建立后,可以通过以上接口修改消息队列阻塞与否标志 mq_flags
, mq_maxmsg
和 mq_msgsize
用于在创建消息队列时指定队列最大消息数目和消息数据长度, 其他值只能读取。
// 消息发送与接收
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
消息队列建立时可以指定队列最大消息数和给定消息的最大字节数, MQ_PRIO_MAX
定义消息的最大优先级加1。
发送消息的时候, 传递给函数的长度是实际发送数据的大小长度,但是,在调用接收函数时,传递的长度是消息的最大长度,也就是创建消息队列时设置的长度,否则会导致接收消息失败。
发送函数当消息队列满时会阻塞,而消息接收函数会在队列为空的情况下阻塞。
详细接口使用 :
参考
异步通知
Posix 消息队列允许异步通知,告知何时有消息放入空的队列中。
- 产生一个信号
- 创建一个线程执行指定函数
// 建立或者删除异步通知事件
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
注意 :
notification 不为空,注册通知; 为空,且当前进程注册过通知,则撤销通知。
任意时刻只有一个进程可以注册接收通知。
消息放入一个空队列中,且已有进程注册通知,只有在没有其他进程使用 mq_receive 阻塞等待的情况下通知会发出。
消息通知发出后,注册即被撤销,需要重新注册通知(读取消息前重新注册)。
只有当队列为空到有数据,并且注册了通知才会发出通知。
Posix mqueue 测试代码, 发送接收, 异步信号量和线程
System V 消息队列
(新程序优先使用Posix 的队列)
System V 消息队列使用消息队列标识符来标识。
主要接口如下所示,
#include <sys/msg.h>
// 创建或访问已存在消息队列
// 返回值供其他操作函数使用, msgid
int msgget(key_t key, int oflag);
// 发送消息
int msgsnd(int msgid, const void *ptr, size_t length, int flag);
// 接收消息
ssize_t msgrcv(int msgid, void *ptr, size_t length, long type, int flag);
// 删除,修改消息队列
int msgctl(..)
消息模板, 第一个字节指定type,后续根据需求定义
struct msgbuf {
long mtype; // > 0
char mtext[1];
};
消息类型必须大于0, 后续的内容大小自行定义,内核不会解消息数据的内容。
Socket
二、 同步
互斥锁和条件变量
互斥锁用于上锁保护临界区,保证任何时刻只有一个线程在临界区执行; 条件变量用于等待。
互斥锁的API
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mptr); // 阻塞等锁
int pthread_mutex_try_lock(pthread_mutex_t *mptr); // 返回 EBUSY
int pthread_mutex_unlock(pthread_mutex_t *mptr);
条件变量
条件变量一般和互斥锁配合使用,互斥锁用于等到条件到达时处理数据的互斥性。
一般来说,条件变量返回后需要再次检查下条件是否为真(自定义标志位或计数等方式),避免虚假唤醒。
#include <pthread.h>
// 调用以下函数,先手动互斥上锁,该函数内部执行解锁并等待
// 条件到达,在此上锁并函数返回,执行后我们需要手动在释放锁
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *ptr);
int pthread_cond_timewait(pthread_cond_t *cptr, pthread_mutex_t *ptr
const struct timespec *abstime); // abstime是绝对时间
// 唤醒等待条件线程
// 如果没有等待线程, 该信号丢失
int pthread_cond_signal(pthread_cond_t *cptr);
int pthread_cond_broadcast(pthread_cond_t *cptr);
互斥锁和条件变量可以通过静态分配和动态分配获得,动态分配可以设置互斥锁,条件变量的属性,比如设置为进程共享PTHREAD_PROCESS_SHARED
(共享内存区中)用于进程间同步。
读写锁
相比互斥锁直接上锁, 读写锁将写操作和读操作进行了区分。当保护数据读比写频繁时使用。
- 只要没有线程持该锁进行写,其他多个进程可以同时持锁进行读。
- 仅当没有线程持该锁读或者写,才能有一个线程持锁进行写。
(共享锁-独占锁)
#inlcude <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_unlock(pthread_rwlock_t *rwptr);
同样,读写锁使用前,可以静态分配,也可动态分配并设置参数。
注意
线程可能在使用锁的期间被取消,退出,此时有些锁还没有释放,系统不会自动释放这些锁,需要设置清理函数来实现。
参考 man page 说明及其示例
记录上锁
当一个进程正在读写一个文件的某部分的时候, 记录锁可以阻止其他进程修改同一个文件区(范围锁,粒度1个byte),其功能类似读写锁。
实现接口 :
#include <fcntl.h>
int fcntl(int fd, int cmd, /* struct flock *arg*/);
/*
cmd :
F_SETLK 根据结构体 flock 设置锁, 错误返回 : EACCESS或者EAGAIN
F_SETLKW 同F_SETLK, 但是如果无法实施,阻塞等待
F_GETLK 利用flock结构体指定参数获取指定部分锁状态, 通过flock返回 (使用时, type不要设置为unlk, 出错)
*/
struct flock {
short l_type; /* 锁的类型: F_RDLCK, F_WRLCK, F_UNLCK */
short l_whence; /* 加锁的起始位置:SEEK_SET, SEEK_CUR, SEEK_END */
off_t l_start; /* 加锁的起始偏移,相对于l_whence */
off_t l_len; /* 上锁的字节数*/
pid_t l_pid; /* 已经占用锁的PID(只对F_GETLK 命令有效) */
/*...*/
};
- 调用 close 或者进程结束时, 自动释放所有锁
- 记录锁不被fork继承, 但是如果没有设置 FD_CLOSE , exec执行会继承记录锁
- 加读锁要求fd读权限,加写锁需要fd写权限
- 同一个进程,不管加锁与否,读取状态永远是未加锁
Posix 是劝告性锁,所以, 一个进程无视直接(不判断直接读写)读写是无法控制的。
Posix 信号量
Posix 提供有名信号量和基于内存(共享内存)的无名信号量。
下图可见两种信号量接口函数的调用差别
有名信号量
如 消息队列一节中类似, 通过以下接口打开已经存在的或者创建不存在的信号量(O_CREAT, 并指定后面两个参数), 函数调用成功, 返回指向信号量的指针供后续函数使用,否则, 返回SEM_FAILED
。
#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag, ...
/*mode_t mode, unsigned int value*/);
信号量打开后,在进程退出时,会自动关闭。另外,可以通过直接调用接口 int sem_close(sem_t * sem);
关闭。
有名信号量随内核,因此关闭信号量不会删除,除非调用过 int sem_unlink(const char *name);
(引用计数)
同步时使用接口如下 :
// !!! 如果被某个信号中断, 可能提前返回, 返回错误 EINTR
int sem_wait(sem_t *sem); //大于零减1返回,否则阻塞等待
int sem_trywait(sem_t *sem); // 大于零减1返回,否则返回 EAGAIN
int sem_post(semt *sem); // 使用完成, 信号量加1返回 (可在信号处理函数调用)
// 获取信号量计数值
// 信号量被锁, 返回值 0 或者负数(其绝对值是等待信号量解锁的线程数)
int sem_getvalue(sem_t *sem, int *val);
// 成功返回0 出错 -1
基于内存的信号量
基于内存的信号量不需要指定信号name
, 由应用程序分配内存空间,通过传入的指针sem
返回。
(一般在有亲缘关系的进程线程间同步)
// sem : 程序分配 sem_t 内存空间返回指针, 调用者分配的!!
// shared = 0, 表示该信号量在同一进程不同线程间使用, sem 为进程全局变量
// 非零, 存放在某类型共享内存, 不同进程间使用, sem 共享内存中
int int_init(sem_t *sem, int shared, unsigned int value);
// 出错返回 -1, 正常不是返回 0
int sem_destroy(sem_t *sem);
对于一个初始化的信号量,再次初始化是未定义的。
使用完一个无名信号量后,调用sem_destroy摧毁它。这里要注意的是:摧毁一个有线程阻塞在其上的信号量的行为是未定义的。
System V信号量
System V 信号量一般指的是计数信号量集
三、共享内存
共享内存是可用 IPC 形式中最快的, 因为共享内存中的单个数据副本对于共享该内存区的所有线程或者进程都是可用的,对共享内存进行操作需要其他同步措施保证。
内存映射文件
通过打开一个文件, 使用函数 mmap
映射到地址地址空间,通过操作内存的方式代替文件读写,通过此方式可以实现内存共享(两个进程打开同一个文件进行映射)
#include <sys/mman.h>
// addr 指定映射其实地址, 一般为NULL, 内核分配
// len 映射字节数 (内核按页为基本单位)
void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
// 成功返回映射起始地址, 出错 MAP_FAILED
prot | des |
---|---|
PROT_READ | 数据可读 |
PROT_WRITE | 数据可写 |
PROT_EXEX | 数据可执行 |
PROT_NONE | 数据不可访问 |
prot
参数指定读写权限
prot | des |
---|---|
PROT_READ | 数据可读 |
PROT_WRITE | 数据可写 |
PROT_EXEX | 数据可执行 |
PROT_NONE | 数据不可访问 |
flag
必须指定MAP_SHARED
(修改所有进程可见,改变了底层支撑对象)或者 MAP_PRIVATE
(修改只对该进程有效,不改变其底层支撑对象)其中一个, 并可选择性或上MAP_FIXED
(不适合移植,不建议; 如果addr不为空时如何处理??)
从进程地址空间删除一个映射关系, 如果在映射时使用MAP_PRIVATE
, 删除后, 之前的改动丢弃。
int munmap(void *addr, size_t len);
如果使用 MAP_SHARED
, 内核在某个时刻自动更新文件,保证内容一致, 如果我们需要马上确定,可以显示调用以下接口实现
int msync(void *addr, size_t len, int flag);
其中flag
指定执行方式 : MS_ASYNC
异步写, 马上返回(内核队列);MS_SYNC
同步写,执行完成才返回。另外可选 MS_INVALIDATE
, 与其最终副本不一致的文件数据的所有内存副本都失效。
共享内存实现方式,其除了接口差别还有 :
- Posix 共享内存的大小可以在任何时候通过
ftruncate
修改。 - System V 共享内存的大小在调用
shmget
创建时固定后不能修改。
Posix 共享内存
shm_open
获取有一个描述符号后通过 mmap
映射到内存(类似内存映射文件, 差别是不需要作为一个文件实现),然后通过 ftruncate
确定共享内存的大小, 之后可以关闭描述符,通过映射内存地址读写数据。
// 新建打开,删除共享内存对象
#include <sys/mman.h>
int shm_open(const char *name, int oflag, mode_t mode);
int shm_unlink(const char *name);
// 设置共享内存对象
#include <unistd.h>
int ftruncate(int fd, off_t length);
// 获取共享内存对象信息
#include <sys/types.h>
#include <sys/stat.h>
int fstat(int fd, struct stat *buf);
进程终止, 如果没有删除共享内存空间, 对象将继续存在。
System V共享内存
参考
- 《UNIX网络编程卷2 : 进程间通信》
- IPC分类
- Poxis 消息队列
- System V 消息队列
- 文件映射