第10章:信号量与管程
信号量
信号量使用
互斥访问
条件同步
管程
经典同步问题
哲学家就餐问题
读者-写者问题
10.1 信号量(Semaphore)
回顾
- 并发问题
多线程并发导致资源竞争 - 同步概念
协调多线程对共享数据的访问
任何时刻只能有一个线程执行临界区代码 - 确保同步正确的方法
底层硬件支持
高层次的编程抽象
信号量
- 信号量是操作系统提供的一种协调共享资源访问的方法
软件同步是平等线程间的一种同步协商机制
操作系统是管理者,地位高于进程
用信号量表示系统资源的数量 - 由Dijkstra在20世纪60年代提出
早期的操作系统的主要同步机制
现在很少用(但在计算机科学研究还是非常重要) - 信号量是一种抽象的数据类型
由一个整型(sem)变量和两个原子操作组成
- P()(Prolaag(荷兰语尝试减少))
sem减1
如sem<0,进入等待,否则继续 - V()(Verhoog(荷兰语增加))
sem加1
如sem<=0,唤醒一个等待进程
信号量的特性
- 信号量是被保护的整数变量
初始化完成后,只能通过P()和V()操作修改
由操作系统保证,PV操作是原子操作 - P()可能阻塞,V()不会阻塞
- 通常假定信号量是“公平的”
线程不会被无限期阻塞在P()操作
假定信号量等待按先进先出排队
自旋锁是做不到先进先出的,因为自旋锁需要占用CPU随时地去查,有可能临界区使用者退出的时候它刚查完,下一个进入者是谁去查它就能进去
信号量的实现
ClassSemaphore {
int sem;
WaitQueue q;
}
Semaphore :: P() {
sem --;
if (sem<0) {
Add this thread t to q;
block(P);
}
}
Semaphore :: V() {
sem ++;
if (sem<=0) {
Remove a thread t from q;
wakeup(t);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
10.2 信号量的使用
- 信号量分类
可分为两种信号量
二进制信号量:资源数目为0或1
资源信号量:资源数目为任何非负值 - 两者等价
基于一个可以实现另一个 - 信号量使用
互斥访问
临界区的互斥访问控制
条件同步
线程间的事件等待 - 用信号量实现临界区的互斥访问
//每类资源设置一个信号量,其初值为1
mutex = new Semaphore(1); //初始化
mutex -> P(); //第一个线程进来之后,由1变成0,能进去;若第二个线程也想进入临界区,由0变成-1,在P操作进入等待状态
critical section;
mutex -> V(); //第一次线程执行结束,进行V操作,使信号量计数由-1变成0, 唤醒第二个进程,则第二个进程就会在第一个进程结束时进入临界区
1
2
3
4
- 必须成对使用P()和V()操作
P()操作保证互斥访问临界资源
V()操作在使用后释放临界资源
PV操作不能次序错误、重复或遗漏
用信号量实现条件同步
//条件同步设置一个信号量,其初值为0
condition = new Semaphore(0);
//线程A 线程B
...M...
condition -> P();
...N... //使用数据 ...X... //准备数据
condition -> v();
...Y...
1
2
3
4
5
6
7
8
9
- 条件等待:线程A要等待线程B执行完X模块才能执行N模块
假设B先执行完X,此时信号量变成1,则N能直接往下执行
假设A先执行完M,此时信号量变成-1,阻塞,进入等待状态,然后进程B进行V操作,释放,此时信号量为0。若信号量小于或等于0,则说明有线程在等待,此时唤醒等待中的线程A,执行N模块
生产者-消费者问题
生产者 -> 缓冲区 -> 消费者
- 有界缓冲区的生产者-消费者问题描述
一个或多个生产者在生成数据后放在一个缓冲区里
单个消费者从缓冲区取出数据处理
任何时刻只能有一个生产者或消费者可访问缓冲区 - 问题分析
任何时刻只能有一个线程操作缓冲区(互斥访问)
缓冲区空时,消费者必须等待生产者(条件同步)
缓冲区满时,生产者必须等待消费者(条件同步) - 用信号量描述每个约束
二进制信号量 mutex
资源信号量 fullBuffers
资源信号量 emptyBuffers
//信号量实现
CLass BoundedBuffer {
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}
BoundedBuffer :: Deposit(c) {
emptyBuffer -> P();
mutex -> P();
Add c to the buffer;
mutex -> V();
fullBuffers -> V();
}
BoundedBuffer :: Remove(c) {
fullBuffer -> P();
mutex -> P();
Remove c from buffer;
mutex -> V();
emptyBuffers -> V();
}
- 使用信号量的困难
读/开发代码比较困难
程序员需要能运用信号量机制
容易出错
使用的信号量已经被另一个线程占用
忘记释放信号量
不能够处理死锁问题
10.3 管程(Monitor)
- 改进信号量在处理临界区的时候的一些麻烦
- 管程是一种用于多线程互斥访问共享资源的程序结构
采用面向对象方法,简化了线程的同步机制
任一时刻最多只有一个线程执行管理代码
正在管程中的线程可临时放弃管理的互斥访问,等待事件出现时恢复
注 - 一个线程在临界区执行,必须执行到它退出临界区,它才能放弃临界区的互斥访问,而管程允许在执行的过程当中,临时放弃。放弃之后其他线程就可进入管程区域
管程的使用
- 在对象/模块中,收集相关共享数据
- 管程组成
一个锁
控制管程代码的互斥访问 - 0个或多个条件变量
0个等同于临界区
管理共享数据的并发访问
- 条件变量(Condition Variable)
条件变量是管程内的等待机制
进入管程的线程因资源被占用而进入等待状态
每个条件变量表示一种等待原因,对应一个等待队列 - Wait()操作
将自己阻塞在等待队列中
唤醒一个等待着或释放管程的互斥访问 - Signal()操作
将等待队列中的一个线程唤醒
如果等待队列为空,则等同空操作
//条件变量实现
Class Condition {
int numWaiting = 0;
WaitQueue = q;
}
Condition :: wait(lock) {
numWaiting ++;
Add this thread t to q;
release(lock);
schedule();//need mutex
require(lock);
}
Condition :: Signal() {
if (numWaiting>0) {
Remove a thread from q;
wakeup(t);//need mutex
numWaiting --;
}
}
用管程解决生产者-消费者问题
ClassBoundedBuffer {
...
Lock lock;
int count = 0;
Condition notFull,notEmpty;
}
BoundedBuffer :: Deposit(c) {
lock -> Acquire();
while (count == n)
notFull.Wait(&lock);
Add c to the buffer;
count ++;
notEmpty.Signal();
lock -> Release();
}
BoundedBuffer :: Remove(c) {
lock -> Acquire(c);
while (count == 0)
notEmpty.Wait(&lock);
Remove c from buffer;
count --;
notFull.Signal();
lock -> Release();
}
管理条件变量的释放处理方式
Hansen 管程
l.acquire()
...
x.wait() //T1进入等待
//T2进入管程 l.acquire()
...
x.Signal()
...
//T1退出管程 l.release()
... //T1恢复管程执行
l.release()
Hoare 管程
l.acquire()
...
x.wait() //T1进入等待
//T2进入管程 l.acquire()
...
//T2进入等待 x.Signal()
... //T1恢复管程执行
l.release() //T1结束
... //T2恢复管程执行
l.release()
Hansen 管程和Hoare 管程
- Hansen 管程
主要用于真实操作系统和Java中
当前正在执行的线程优先级更高
效率更高,少了一次切换 - Hoare 管程
内部的线程优先执行
优先角度考虑更合理,更容易证明正确性 - Hansen 管程和Hoare 管程区别
Hansen-style:Deposit() {
lock -> acquire();
while (count == n) {
notFull.Wait(&lock);
}
Add thing;
count ++;
notEmpty.Signal();
lock -> Release();
}
Hoare-style:Deposit() {
lock -> acquire();
if (count == n) {
notFull.Wait(&lock);
}
Add thing;
count ++;
notEmpty.Signal();
lock -> Release();
}
10.4 经典同步问题
(一)哲学家就餐问题
- 问题描述
5个哲学家围绕着一张圆桌而坐
桌上放着5支叉子
每两个哲学家之间放1支
哲学家的动作包括思考和进餐
进餐时需要同时拿到左右两边的叉子
思考时将两支叉子放回原处
如何保证哲学家们的动作有序进行?
如:不出现永远都难不到叉子的情况
- 方案1
define N 5 //哲学家个数
semaphore fork[5]; //信号量初值为1
void philosopher(int i) //哲学家编号:0-4
while (TRUE)
{
think(); //哲学家在思考
P(fork[i]); //去拿左边的叉子
P(fork[(i+1)%N]); //去拿右边的叉子
eat(); //吃面条中
V(fork[i]); //放下左边的叉子
V(fork[(i+1)%N]); //放下右边的叉子
}
不正确,可能导致死锁
5个人同时拿到左边刀叉时,就会进入死锁状态
- 方案2
一旦无法充分利用系统资源,就把所有资源打成一个整包,只有一个进程可占用这一整包的资源,不管它是不是都需要,这种情况系统也能正常运行,只是效率低一些。
#define N 5 //哲学家个数
semaphore fork[5]; //信号量初值为1
semaphore mutex; //互斥信号量,初值为1
void philosopher(int i) //哲学家编号:0-4
while (TRUE)
{
think(); //哲学家在思考
P(mutex); //进入临界区,任一时刻只能有一个进程申请到二进制信号量
P(fork[i]); //去拿左边的叉子
P(fork[(i+1)%N]); //去拿右边的叉子
eat(); //吃面条中
V(fork[i]); //放下左边的叉子
V(fork[(i+1)%N]); //放下右边的叉子
V(mutex); //退出临界区
}
互斥访问正确,但每次只允许一个人进餐,性能不好
- 方案3
#define N 5 //哲学家个数
semaphore fork[5]; //信号量初值为1
void philosopher(int i) //哲学家编号:0-4
while (TRUE)
{
think(); //哲学家在思考
if (i%2 == 0) {
P(fork[i]); //去拿左边的叉子
P(fork[(i+1)%N]); //去拿右边的叉子
} else {
P(fork[(i+1)%N]); //去拿右边的叉子
P(fork[i]); //去拿左边的叉子
}
eat(); //吃面条中
V(fork[i]); //放下左边的叉子
V(fork[(i+1)%N]); //放下右边的叉子
}
没有死锁,可有多人同时访问
(二)读者-写者问题
- 问题描述
共享数据的两类使用者
读者:只读取数据,不修改
写者:读取和修改数据
读者-写者问题描述:对共享数据的读写
- “读-读”允许
同一时刻,允许有多个读者同时读 - “读-写”互斥
没有写者时读者才能读
没有读者时写者才能写 - “写-写”互斥
没有其他写者时写者才能写
用信号量解决读者-写者问题
用信号量描述每个约束
- 信号量WriteMutex
控制读写操作互斥
初始化为1 - 读者计数Rcount
正在进行读操作的读者数目
初始化为0 - 信号量CountMutex
控制对读者计数的互斥修改
初始化为1
//Writer
P(WriteMutex);
write;
V(WriteMutex);
//Reader
P(CountMutex);
if(Rcount == 0) //第一个读者才需要申请读写互斥信号量,后来读者只需读者计数器加1
P(WriteMutex);
++ Rcount;
V(CountMutex);
read;
P(CountMutex);
-- Rcount;
if(Rcount == 0) //只有最后一个读者才需要释放读写互斥信号量、
V(WriteMutex);
V(CountMutex);
- 特征:读者优先
即后来的读者一定会先于等待的写者进行写操作
读者-写者问题:优先策略
- 读者优先策略
只要有读者正在读状态,后来的读者都能直接进入
如读者持续不断进入,则写者就处于饥饿 - 写者优先策略
只要有写者就绪,写者应尽快执行写操作
如写者持续不断就绪,则读者就处于饥饿
用管程解决读者-写者问题
两个基本方法
Database :: Read() {
Wait until no writers;
read database;
check out - wake up waiting writers;
}
Database :: Write() {
Wait until no readers/writers;
write database;
check out - wake up waiting readers/writers;
}
管程的状态变量
AR=0; //# of active readers
AW=0; //# of active writers
// 只有一个>0
WR=0; //# of waiting readers
WW=0; //# of waiting writers
//可能都>0
Lock lock;
Condition okToRead;
Condition okToWrite;
解决方案详情:读者
AR=0; //# of active readers
AW=0; //# of active writers
WR=0; //# of waiting readers
WW=0; //# of waiting writers
Lock lock;
Condition okToRead;
Condition okToWrite;
Public Database :: Read() {
//Wait until no writers;
StartRead();
read database;
//check out - wake up waiting writers;
DoneRead();
}
Private Database :: StartRead() {
lock.Acquire();
while ((AW+WW)>0) { //条件决定了优先策略
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Private Database :: DoneRead() {
lock.Acquire();
AR--;
if ((AR==0 && WW)>0) { //条件决定了优先策略
okToWrite.signal();
}
lock.Release();
}
解决方案详情:写者
AR=0; //# of active readers
AW=0; //# of active writers
WR=0; //# of waiting readers
WW=0; //# of waiting writers
Lock lock;
Condition okToRead;
Condition okToWrite;
Public Database :: Write() {
//Wait until no readers/writers;
StartWrite();
write database;
//check out - wake up waiting readers/writers;
DoneWrite();
}
Private Database :: StartWrite() {
lock.Acquire();
while ((AW+AR)>0) {
WW++;
okToWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}
Private Database :: DoneWrite() {
lock.Acquire();
AW--;
if (WW>0) {
okToWrite.signal();
} else if (WR>0) {
okToRead.broadcast();
}
lock.Release();
}