在之前的课程中,我们已经学习了进程相关的知识。进程是计算机程序被执行的一个实例(instance),一个进程可能由一个或多个线程(thread)组成,同时执行指令。在这一章我们将学习线程的知识,包括如何写一个多线程的程序,如何处理多线程引发的问题。
线程是啥?线程(thread) 的正式英语名称是“thread of execution”。它代表一列有顺序的,需要 CPU执行的指令,可以随时被内核开始、中断或继续运行。
线程使用栈来记忆如何从函数中返回,以及存储变量和参数等等。在多线程状态下,每个线程拥有一个自己的栈和程序计数器(PC),而堆,地址空间和全局变量则是多个线程共用的。每个线程对应一个叫做** 线程控制块(Thread Control Block,TCB) **的数据结构,里面存储了这个线程的栈指针、程序计数器、处理器状态、线程标识符(类似于进程标识符,是系统中唯一标明这个线程的一个数字)等等。
我们过去应用多进程是为了使得多个程序并发,改善资源利用效率。但是各个进程之间不共享内存地址,给我们造成了许多不便。使用多线程则可以避免这些问题。不同线程之间共享地址空间,只是栈和程序计数器不同,因此两个线程间通信也较为容易。更为重要的是,在不同进程间切换时,由于进程地址空间不同,我们需要清空翻译快表和基于逻辑地址的高速缓冲存储器,这一过程会使我们的程序运行效率大打折扣;线程间切换就没有这个问题,对于提高程序运行效率大有裨益。
当然有得必有失,下面这首小诗就表现出了多线程程序的问题:
一个程序员遇到了一个难题
她决定用多线程来解决
于是现在两个问题了她有
本来诗的最后一句应该是“于是现在她有了两个问题”,但我们得到的结果却是“于是现在两个问题了她有”。多线程经常面临的问题是多个线程的运行顺序是不确定的,且当它们同时修改一个变量时我们最终得到的结果也是不确定的。为了控制多线程运行的结果,我们将在这一章引入互斥和同步的概念,并介绍实现互斥和同步的方法。不过在开始介绍有关互斥和同步的概念以前,让我们先来夯实基础,学习线程的不同类型、Linux 对于线程的实现和 Linux Pthread 库包含的函数。
不同系统中对于线程的实现主要分三种类型:
- 内核级线程
- 用户级线程
- 混合式线程
要强调一点,就是这里的内核级和用户级指的 并不是 线程的特权等级!一个用户进程建立的线程还是只有原来这个进程的特权等级,不能执行只有内核才能执行的指令。用户级和内核级指的是这个线程是会被用户进程还是内核调度。
我们虽然还没有学到任务调度的算法,但我们已经知道,系统会用某种方式调度不同的任务,使他们在处理器上交替运行,这个过程就是内核对于任务的调度。一个内核级的线程就将在系统中获得被调度的权限,在这种情况下,如果一个进程有两个线程,那么这两个线程会被独立地调度,因此这个进程占取的处理器时间就是这两个线程占取的处理器时间之和。
在内核级线程模型中,一个线程如果进行 I/O 操作而被阻塞,那么这个进程剩余的线程还可以继续运行,因此线程的状态不等于进程的状态。同样地,如果一个线程给自己的进程发送 sleep 信号,这个线程依然可以继续运行。
与内核级相对的用户级线程就没有这种权限了。
如果一个系统使用用户级线程的模式,那么它就只会调度进程,进程内部再自行调度线程。这就是说
- 属于同一个进程的两个线程永远不会同时在两个处理器上运行;
- 进程也不会因为多分出了几个线程就获得更多的处理器时间。
更糟糕的是,如果一个线程由于 I/O 或其它原因被阻塞,那么整个进程就都会被阻塞;而在内核级线程的模型中,一个线程被阻塞后其它线程还可以继续运行。看到这里你可能会问,这种用户级线程对于分出线程的进程来讲有什么好处呢?
相对于分出子进程运行同样的内容,用户级线程的优势是显而易见的——多个线程间可以共享地址空间,减少通信带来的麻烦。
相对于内核级线程来讲,用户级线程有两个优点。
- 一方面,它减少了线程间上下文切换的代价。内核级线程被系统调度,因此同一进程中不同线程间的上下文切换也会导致用户态向内核态的转换,这就包含了从一个用户线程进入内核线程,再进入另一个用户级线程的繁琐过程。用户级线程由于不涉及用户态向内核态的转换,也就没有这些缺点。
- 另一方面,同一进程不同用户级线程间的调度完全由用户进程本身决定,用户进程就可以用自己的调度算法选择最优的调度方法;内核级线程在这方面就没有这种优势——内核对于用户级进程来讲相当于一个黑箱,我们无法知道里面运行的是何种调度算法,因此不能预测或控制它的效率。
学完了内核级线程和用户级线程,你可能会问一个问题:有没有一种线程的设计方法,能综合内核级线程和用户级线程的优点呢?这就是混合式线程。在混合式线程中,用户即可以建立用户级线程,也可以通过系统调用建立内核级线程。假设一个进程建立了 N 个用户级线程,M 个内核级线程,那么 ,且用户可以自己调整用户级进程向内核级线程的映射。这种方法使对应着同一个内核级线程的用户级线程之间的切换更加高效,同时又使一个进程能够获得更多处理器时间、不会被一个线程阻塞。
这种线程模型的缺点就在于它非常复杂、不利于实现。FreeBSD 和 NetBSD 两个系统都曾经使用过华盛顿大学在一篇论文中提出的混合式线程模型,但它们现在都已经放弃了这种模型;大部分 Linux 系统中使用的也是较为简单的内核级线程模型。在这一章中,可以假设我们所讲的线程都是内核级线程,而非用户级线程。
在命名线程模型时,我们也可以用不同模型中內核级线程和用户级线程之间的对应关系来区別这些模型。
- 內核级线程模型中,由于每个线程都是一个可以被內核调度的任务,用户级线程与内核级线程的对应关系是1:1的,我们就叫它1:1线程模型;
- 与之形成对比的是用户级线程模型,在这种模型中一个进程中的多个用户级线程都被映射到一个可以调度的任务中因此用户级线程与内核级线程的对应关系是N:1的,我们就叫它N:1模型;
- 最后,混合式线程模型中,一个进程可以建立N个用户级线程和M个內核级线程,因此它就被称为N:M模型。
练习:
竞争与互斥
上一节我们学习了如何写一个简单的多线程程序。多线程虽然方便快捷,但是问题也不少。在第一节的小诗中我们已经见识到了多线程执行顺序的不确定性所带来的问题;多线程程序还面临着另一个问题:由于不同线程间共享内存地址空间,一旦两个线程同时试图修改同一个地址,我们就面临着 资源竞争(race condition),而竞争的结果是不确定的。这一节我们来见识一下资源竞争带来的问题。
出于某种原因,你需要将一个整数从零逐个增加到一千万。为了节约时间,我们将使用两个线程分别增加这个整数。我们有如下代码:
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
int *number;
void *adding(void *arg){
for(int i = 0; i < 5000000; i++){
(*(int *)arg)++;
}
return NULL;
}
void *adding(void *arg);
int main(){
number = malloc(sizeof(int));
pthread_t p[2];
pthread_create(&p[0], 0, adding, number);
pthread_create(&p[1], 0, adding, number);
pthread_join(p[0], (void**)NULL);
pthread_join(p[1], (void**)NULL);
printf("%d\n", *number);
return 0;
}
编译执行之后,我们的程序会在屏幕上打印出什么呢?会是一个 1000000010000000 么? 么?或者程序崩溃因为你不能同时修改同一片内存?答案是:都不是,我们都不确定会打印出什么。结果一定是 50000005000000 到 1000000010000000 之间的一个数。
我们增加 number 的方法是使用 += 操作符。问题在于,这个指令并不是不可拆分的(atomic)。如果你学过一些计算机组成原理,你可能知道,这个指令实际上是 CPU 先将这个数字读取到寄存器中,然后增加1,然后写回内存。在这个过程中,很可能出现的一个情况是当一个线程从内存中读取这个整数之后,还尚未增加或者写回它,另一个线程也从内存中读取了它。这样两个线程写回内存的值其实是一样的。也就是说,两个线程同时抓取内存资源时若无一定先后顺序,会造成资源竞争。
牛奶过量
上一节中我们定义了资源竞争出现的条件,这一节中我们就来介绍一下解决资源竞争的方法。在讲解这一部分内容时,教授经常使用的一个例子就是牛奶过量问题(Too Much Milk)。
假设你和你妈每天早晨都要喝牛奶,有一天你回家发现冰箱里没有牛奶了,于是你出去买牛奶;等你回到家、打开冰箱,却发现已经有一盒牛奶了,因为你妈妈不知道你去买牛奶了,所以也去买了一盒牛奶。这样有了两盒牛奶,你们就不一定能在牛奶过期以前把它们全部喝完了。我们可以把买牛奶这一行为看做临界区,它必须遵守由 Dijkstra 提出的临界区调度的三个原则:
- 一次只有一个线程执行临界区的指令;
- 如果一个线程已经在执行临界区指令则其它线程必须等待;
- 一个等待中的线程在有限的等待时间后会进入临界区;
前两条原则保证了线程的安全性,第三条保证了程序运行的活性(即,程序不会在某个指令永远的卡住)。
为了实现上面三条原则,我们需要一种通知另一个线程的办法;现实生活中我们可以在冰箱上留一个便签,写上“我去买牛奶了”,对方就不会再买牛奶了,我们可以尝试把这个过程写成代码:
int note = 0;
int milk = 0;
if (!milk ) { //A
if (!note) { //B
note = 1; //C
milk++; //D
note = 0; //E
}
}
现在加入我们有两个线程执行这段代码,会不会出现问题呢?让我们将这两个线程命名为 1,2
;我们用数字角标表示这个指令是由哪个线程执行的,那么下面这个指令序列就会给我们造成问题:A_1
,B_1
,A_2
,B_2
,C_1
,D_1
,E_1
,C_2
,D_2
,E_2
。
两个线程分别判断没有牛奶也没有字条以后分别进入了买牛奶的过程,这样尽管我们有了字条,还是买了两盒牛奶,违反了第一条原则。
这里我们面临的主要问题是人的行为是连续的,所以我们不会在发现没有纸条之后离开一段时间再去留下纸条、买牛奶(当然,如果你非要这么干那也没有办法),但线程可能在任何指令处被系统调度器打断,所以我们不能保证上面这段代码在两个线程中运行的结果。
怎样才能解决上面提到的问题呢?上面的问题似乎主要来源于一点,就是我们先检查了 note 的值、后将 note 值设为1 。如果在这两个指令之间另一个线程查看了 note 的值就不会知道我们已经决定去买牛奶了。因此我们可以先写字条,然后再查看对方是否已经留下字条来决定我们是否去买牛奶:
int note1 = 1; //线程2会把这一行替换为 note2 = 1;
//并把下面的 if 判断换为判断 note1 是否为 1
if (!milk) {
if (!note2) {
milk++;
}
}
note1 = 0;
很可惜,这种方法会违反第三条原则,因为如果线程 1 和线程 2 分别检查发现没有牛奶,然后分别发现对方已经留下字条,那么就没有人会去买牛奶了。我们希望如果两个线程中有一个在发现对方已经留下字条后再等待一会儿,之后重新查看冰箱里是否有牛奶。这就给了我们下面这种解决方法:
- 线程 1 执行的代码与前面差别不大:
int note1 = 1;
if (!milk) {
if (!note2) {
milk++;
}
}
note1 = 0;
- 线程 2 执行的代码却与前面的有一定的差别:
int note2 = 1;
while (note1){
;
}
if (!milk) {
milk++;
}
note2 = 0;
这段代码可以确保在没有牛奶的情况下,有且只有一个人回去买牛奶。如果你不信的话我们可以来分析一下这两个线程的运行过程。
- 假如线程 1 先开始运行,那么线程 1 就会先将 note1 设为 ,之后分为两种情况,
- 如果线程 2 在线程 1 买牛奶以前就开始运行,那么 note2 也会被设为 1,然后线程 2 就会卡在 whileloop 中,直到系统调度使得线程 1 继续运行,线程 1 发现 note2 为1 ,就不会买牛奶,而直接将 note1 设为 0,这时线程 2 就可以继续运行,去买牛奶。
- 另一种情况是,线程 2 在线程 1 买完牛奶后再进入系统,那么它就会直接跳过 while loop,发现已经有牛奶,就会拿走字条。
- 最后一种情况是,线程 2 先进入系统,那它就会直接去买牛奶,这时线程 1 如果进入发现有牛奶或有字条就会直接拿走字条。因此我们可以保证有且只有一个人会买牛奶。
上面这种算法实际上就是 Peterson 算法,我们在这里不作介绍,有兴趣的同学可以在网上查看这个算法的应用。从上面这个例子中我们可以看出,在并发多线程中实现互斥是比较复杂的。我们上面提到的方法虽然确实保证了中间的临界区只有一段代码运行,但它只适用于 2 个线程,如果我们想将它推广到多个进程那么问题就会更加复杂,而且如果我们想用两个线程执行同一段代码,那么上面代码中的note1,note2就不适用了,因为运行中的线程不会知道自己是 1 还是 2。接下来的章节我们会介绍两种更为简便实用的概念:锁和条件变量。
锁
为了实现互斥,我们需要简便的方法来判定一个线程是否有权利执行一段临界区的代码。锁就是这样一种方法:为了防止你妈妈和你同时去买牛奶,你可以在去买牛奶以前给冰箱上个锁,这样你妈妈看到冰箱已经上锁就知道,你一定已经去买牛奶了。在程序中,锁也是这样一种存在——我们为保护一些共享的数据而建立一个锁,每次修改这些数据以前都先试图获得锁,成功获得锁后再进入临界区修改数据。
在这里我们需要强调一点:锁是用来锁住数据的,而不是用来锁住代码的。你的每个锁一定会有它保护的共享数据,所有调用和修改该数据的操作都会被锁保护。如果你发现自己想不明白自己写的代码中的锁在保护哪些数据,那么你可能需要重新设计锁了。
锁最常用的地方是在一个共享的对象中。虽然 C 不是一门面向对象的语言,但我们仍然可以用它来实现面向对象的思想:现在就让我们来用一个面向对象的例子来带你认识锁的应用。假设我们已经实现了一个锁struct lock
和用来获得锁、解锁的函数void lock_acquire(struct lock *my_lock);
和void lock_unlock(struct lock *my_lock);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
struct word{
struct lock *word_lock;
char *word;
int count;
}
int count_word(struct word *my_word){
lock_acquire(my_word->word_lock);
my_word->count += 1;
lock_unlock(my_word->word_lock);
return 0;
}
int modify_word(struct word *my_word, char *new_word){
lock_acquire(my_word->word_lock);
my_word->word = realloc(my_word->word, (strlen(new_word)+1)*sizeof(char));
strcpy(my_word->word, new_word);
lock_unlock(my_word->word_lock);
return 0;
}
这里struct word中的word_lock保护了struct word中的两个成员,word和count,在我们需要修改这两个变量时,我们都必须先获得锁,然后再修改其中的值。
锁的用法是比较好理解的,现在我们需要考虑的是如何实现一个这样的锁。
你可能很容易就能想到上一节中买牛奶的例子:如果我们用一个位表示目前锁是否被持有, 0表示空闲,1表示被持有,那我们就可以在发现这一位为 时将其设为 ,成功获得锁。
但这样有一个问题:如果在一个线程检查这一位为0后立刻被内核中断、切换至下一个线程,下一个线程查看这一位为 0,获得了锁,执行临界区代码至一半,又被切换回原来的线程,原来的线程也认为锁是空闲的,因此将其设为1 ,获得锁后运行临界区。这样我们就有两个线程同时执行临界区代码,锁就形同虚设了。
为了避免上面提到的这种问题,我们希望读取和设置值能成为一个不可分割的操作,这样就没有第二个进程可以插进这两个操作之间了。这就是 测试并设置指令(Test and Set)
。这一指令由硬件提供,它会读取一个内存位置的值,然后将 存入这个位置。这样,如果我们将一个整数设为 ,然后在试图获得锁用 Test and Set 将它设为 1,在返回值为 0时成功获得锁,并在解锁时将这个值重新设为 ,这个整数就可以成为一个锁。
struct lock{
int value;
}
void lock_acquire(struct lock *my_lock){
while (test_and_set(&my_lock->value))
;
return;
}
void lock_unlock(struct lock *my_lock){
my_lock->value = 0;
return;
}
上面的代码就是一种实现锁的方法;由于不能获得锁的线程会在 while loop 中不断循环,这种实现方法被很形象地称为 自旋锁(Spinlock),。这种方法有一个显而易见的缺点,那就是它的效率很低,假设一个线程企图获得一个另一个线程已经持有的锁时,它会不断循环,浪费处理器时间。因此,自旋锁只适用于已知单个线程持有锁时间很短的情况。
为了解决上面提到的效率低的问题,我们可以把未能成功获得锁的线程加入到这个锁的等待名单上,在持有锁的线程解锁时再选择一个线程获得锁。为了实现这一功能,我们就需要使用下面的章节中我们要讲到的条件变量。
练习:对换自旋锁
条件变量
在讲到锁的时候我们已经提到,在一个锁已经被一个线程持有时,我们应该把想要获得这个锁的线程加入等待名单,然后阻塞这些线程,在解锁时从等待名单中选择一个线程解锁,从而提高运行效率。你可能已经注意到,这个过程跟我们之前提到的互斥有所不同——互斥的定义是一段代码不能同时被两个线程运行,而我们现在希望达到的目的是使得两个线程不能同时运行两段需要锁的代码,因此我们需要一个工具通知一个线程另一个线程已经解锁,然后让想要获得锁的线程按一定顺序获得锁、分别运行。这种协调多个线程运行速度的行为被称作 同步(synchronization)
在某个条件不成立时,需要该条件成立的线程可以对与这个条件相对应的条件变量调用 wait()
函数, wait()
函数会将这个线程加入等待名单,并进入阻塞态,并自动解锁;当某个线程修改这个条件涉及到的变量时,线程必须自行调用 signal()
或 broadcast()
。 signal()
和 broadcast()
的区别是 signal()
只会将等待名单上一个线程唤醒,由阻塞态变为就绪态,而broadcast()
会将等待名单上所有线程都有阻塞态变为就绪态。收到信号的线程回到就绪态的线程此时仍然处于wait()
函数中 没有返回,当内核选择继续运行该线程时,线程会获得其 原来持有的锁,然后再从wait()
函数返回。
你可能注意到 wait()
和signal()
这两个函数恰好和Unix
系统中等待子进程结束的 wait()
函数和修改进程对信号的处理方式的 signal()
函数的命名冲突了,因此我们不能直接叫它们 wait()
和 signal()
。在下面的实例中,我们会管这两个函数叫 void cond_var_wait(struct cond_var *variable,struct spinlock *lock)
和 void cond_var_signal(struct cond_var *variable)
。假设我们已经实现了这两个函数,以及一个条件变量的结构 struct cond_var
,一个自旋锁 struct spinlock
,以及用来获得自旋锁和解锁自旋锁的函数 void spinlock_acquire(struct spinlock *lock);
和void spinlock_unlock(struct spinlock *lock);
。现在我们就来看一看锁结构的实现。
struct my_lock{
int value;
struct spinlock *spin;
struct cond_var *condition;
}
我们先来定义一个数据结构。struct my_lock
是我们新定义的锁,它包含三个成员: value
与前两节中我们定义的自旋锁中的 value
功能相同,其值为 1时表示锁已被一个进程持有,其值为0 时表示锁空闲;spin
是一个自旋锁,用来保护 struct my_lock
的内部结构;condition
是一个指向我们自己定义的条件变量结构的指针,它与这个锁相匹配,用来在锁已被持有时使想要获得锁的线程等待。下面我们就来写获得锁的 void acquire_my_lock(struct my_lock *lock)
函数与解锁的void unlock_my_lock(struct my_lock *lock)
函数。
void acquire_my_lock(struct my_lock *lock){
spinlock_acquire(lock->spin);
while (lock->value) {
cond_var_wait(lock->condition);
}
lock->value = 1;
spinlock_unlock(lock->spin);
}
void unlock_my_lock(struct my_lock *lock){
spinlock_acquire(lock->spin);
lock->value = 0;
cond_var_signal(lock->condition);
spinlock_unlock(lock->spin);
}
由于我们用自旋锁保护锁的内部状态,在想要获得锁的时候,我们必须先获得锁内部的自旋锁,然后才能获得查看、修改锁的内部状态的权限。你可能会问一个问题:既然我们仍然要用自旋锁来锁住struct my_lock
的内部状态,那这种锁的效率为什么会比普通的自旋锁高呢?这个问题的答案与锁的持有时间有关。
首先,除了自旋锁以外,我们几乎只有一种其它实现锁的办法,那就是禁用中断——在禁用中断的情况下,我们不会收到系统的计时器或任何硬件发来的中断,因此我们的代码在获得锁后一定可以作为一个整体执行完成。然而,禁用中断这种做法比自旋锁更不被提倡,因为在禁用中断的过程中,我们可能失去硬件发来的 I/O 中断等重要的信息。尽管我们在少数情况下必须禁用中断,在持续时间未知的情况下我们还是不希望使用禁用中断的手段达到锁的效果。
因此,我们只有尽量缩短持有自旋锁的时间。我们无法控制一个线程持有一个锁的时间,但我们知道,一个线程花在获得锁的函数里的时间是固定且较短的。因此我们用一个自旋锁来保护锁的内部状态,而不是直接在 while loop 里反复检查锁的状态。
在上面很短的代码中,有一点我们要特别提醒你注意:我们在一个 while loop 中执行cond_var_wait(lock->condition);
,而不是在一个 if 语句后执行。我们将具体讲解这一做法的原因。
void cond_var_wait(struct cond_var *condition, struct spinlock *lock){
TCB *curr = current_thread();
struct list_elem *new_waiter = calloc(1, sizeof(struct list_elem));
new_waiter->thread = curr;
new_waiter->next = NULL;
struct list_element *temp = condition->waiters;
if (!temp) {
condition->waiters = new_waiter;
} else {
while (temp->next) {
temp = temp->next;
}
temp->next = new_waiter;
}
disable_interrupt();
spinlock_unlock(lock);
thread_block(curr);
spinlock_acquire(lock);
enable_interrupt();
}
void cond_var_signal(struct cond_var *condition){
if(!condition->waiters) return;
struct list_element *head = condition->waiters;
condition->waiters = head->next;
thread_unblock(head->TCB);
free(head);
}
条件变量理解测试
void cond_var_wait(struct cond_var *condition, struct spinlock *lock){
TCB *curr = current_thread();
struct list_elem *new_waiter = calloc(1, sizeof(struct list_elem));
new_waiter->thread = curr;
new_waiter->next = NULL;
struct list_element *temp = condition->waiters;
if (!temp) {
condition->waiters = new_waiter;
} else {
while (temp->next) {
temp = temp->next;
}
temp->next = new_waiter;
}
disable_interrupt();
spinlock_unlock(lock);
thread_block(curr);
spinlock_acquire(lock);
enable_interrupt();
}
void cond_var_signal(struct cond_var *condition){
if(!condition->waiters) return;
struct list_element *head = condition->waiters;
condition->waiters = head->next;
thread_unblock(head->TCB);
free(head);
}
信号量
在前面的章节中,我们讲解了互斥和同步这两个概念,以及用来实现互斥和同步的锁和条件变量。现在我们来讲一个介于锁和条件变量之间的工具,信号量。信号量(semaphore) 在被初始化时带有一个自定的非负整数值和一个空的等待名单,之后线程可以对这个信号量进行两个操作 P() 与 V() 。 P() 会等待信号量的值变为正数,然后将信号量的值减1 ; P() 是一个不可分割的操作,因此我们不用担心检查信号量的值变为正数后会有其它线程先把信号量的值降低到0 。 V() 会将信号量的值加 ,如果此时信号量的等待名单上有线程,则唤醒其中一个。
我们可以看到,信号量的两个操作似乎与条件变量的 wait() 和 signal() 非常相似,但它其实更适合被用来实现互斥。我们可以在初始化时将它的值设为1 ,这时 P() 就相当于 acquire_lock() , V() 就相当于 unlock_lock() 。由于初始值为1 ,且 P() 是一个不可分割的操作,我们知道同时只能有一个线程会成功地从 P() 返回,进入临界区。这个线程完成临界区代码后可以调用 V() ,使信号量的值重新变为1 ,这时下一个线程又可以进入临界区。
如果我们想要用信号量来模仿条件变量的用法,那就比较困难了。信号量与条件变量的一大区别在于条件变量 没有内部状态。比如,如果一个线程调用了 signal() ,此时没有变量在等待这个条件发生变化,那么这个 signal() 就不会产生任何影响。在条件变为不成立后,下一个来等待这个条件的线程不会因为前面曾经有线程调用过 signal() 就不等待。
信号量就不同了。如果一个信号量的初始值为0 ,一个线程在没有线程在 P() 中等待时调用了 V() ,信号量的值就会被增加至1 。这时如果有一个线程调用 P() ,则它无需经过任何等待,因为前面线程的历史被信号量保留了下来。
信号量与条件变量相比还有一个缺点,那就是条件变量在等待时会将保护共享数据的锁自动解锁,但 P()没有这个功能,因此我们一般会在调用 P() 以前解锁,否则其它线程就无法修改共享数据,造成永远等待的局面。
所幸,还是有一种可以用信号量模仿条件变量的方法的。这种方法由 Andrew Birrell 在微软 Windows 支持条件变量以前实现。下面我们写的代码没有包含有关的数据结构和函数的定义,你可以联系前几节讲的锁和条件变量的定义和这一节讲的信号量的定义、将这段代码当做伪代码来理解:
void cond_var_wait(struct cond_var *condition, struct lock *my_lock){
struct semaphore *my_sema;
semaphore_init(my_sema, 0); // 将信号量初始值设为 1
// 将信号量加入条件变量的等待名单
append_to_list(condition->waiters, my_sema);
lock_unlock(my_lock);
semaphore_P(my_sema);
lock_acquire(my_lock);
}
void cond_var_signal(struct cond_var *condition){
if (condition->waiters) {
semaphore_V(remove_from_list(condition->waiters));
}
}
从上面的代码中我们可以看出,用信号量可以实现互斥和同步的功能,但这两种用法背后的想法是截然不同的,刚刚接触多线程编程的人很容易混淆这两种用法。如果你觉得自己对于同步和互斥的概念的理解仍然不透彻,那么我们就建议你使用锁和条件变量,以巩固你对于互斥和同步的认识。
Pthread 库中的锁和条件变量
前几节中我们讲了如何实现锁和条件变量;这些知识虽然能帮助你对锁和条件变量有更深的认识,但除非你在设计一个操作系统,否则你是不需要自己实现锁和条件变量的。如果你在写一个用户级别的程序,那么你需要的就只是了解 Linux 中提供了哪些已经实现好的锁和条件变量。这一节中,我们就来讲一讲 Linux Pthread 库中包含的锁和条件变量的相关函数。
死锁
死锁的引入
顾名思义,死锁死锁肯定与锁有关,我们知道引入锁又是为了解决多进程或多线程之间的同步与互斥问题,那么到底怎样的情形才会产生死锁呢?
典型的两种死锁情形:
线程自己将自己锁住
一般情况下,如果同一个线程先后两次调用lock,在第二次调⽤用时,由于锁已经被占用,该线程会挂起等待占用锁的线程释放锁,然而锁正是被自己占用着的,该线程又被挂起而没有机会释放锁,因此 就永远处于挂起等待状态了,于是就形成了死锁(Deadlock)
。多线程抢占锁资源被困
又如线程A获 得了锁1,线程B获得了锁2,这时线程A调用lock试图获得锁2,结果是需要挂起等待线程B释放 锁2,而这时线程B也调用lock试图获得锁1,结果是需要挂起等待线程A释放锁1,于是线程A和B都 永远处于挂起状态了,死锁再次形成。
计算机系统中的死锁
1、资源分类
(一)可重用性资源和消耗性资源
可重用资源:可供用户重复使用多次的资源。
性质:
- 每一个可重用性资源中的单元只能分配给一个进程(或线程)使用,不允许多个进程(或线程)共享。
- 可重用性资源的使用顺序: 请求资源—->使用资源—->释放资源
- 系统中每一类可重用性资源中的单元数目是相对固定的,进程(或线程)在运行期间既不能创建也不能删除它。
可消耗性资源:又称临时性资源,是由进程(或线程)在运行期间动态的创建和消耗的。
性质:
- 每一类可消耗性资源的单元数目在进程(或线程)运行期间是可以不断变化的,有时可能为0.
- 进程,或线程)在运行过程中可以不断的创建可消耗性资源的单元,将它们放入该资源类的缓冲区中,以增加该资源类的单元数目。
- 进程(或线程)在运行过程中可请求若干个可消耗性资源,用于进程(或线程)自己的消耗不再将它们返回给该资源类中。
可消耗性资源通常是由生产者进程(或线程)创建,由消费者进程(或线程)消耗。
(二)可抢占性资源和不可抢占性资源
- 可抢占性资源:☞某进程(或线程)在获得该类资源后,该资源可以被其他进程(或线程)或系统抢占。
CPU和主存均属于可抢占性资源。 - 不可抢占性资源:☞系统一旦把某资源分配该进程(或线程)之后,就不能强行收回,只能在进程(或线程)用完之后自行释放。
磁带机、打印机等都属于不可抢占性资源。
引起死锁的原因
(一)竞争不可抢占资源引起死锁
如:共享文件时引起死锁
系统中拥有两个进程P1和P2,它们都准备写两个文件F1和F2。而这两者都属于可重用和不可抢占性资源。如果进程P1在打开F1的同时,P2进程打开F2文件,当P1想打开F2时由于F2已结被占用而阻塞,当P2想打开1时由于F1已结被占用而阻塞,此时就会无线等待下去,形成死锁。
(二)竞争可消耗资源引起死锁
如:进程通信时引起死锁
系统中拥有三个进程P1、P2和P3,m1、m2、m3是3可消耗资源。进程P1一方面产生消息m1,将其发送给P2,另一方面要从P3接收消息m3。而进程P2一方面产生消息m2,将其发送给P3,另一方面要从P1接收消息m1。类似的,进程P3一方面产生消息m3,将其发送给P1,另一方面要从P2接收消息m2。
如果三个进程都先发送自己产生的消息后接收别人发来的消息,则可以顺利的运行下去不会产生死锁,但要是三个进程都先接收别人的消息而不产生消息则会永远等待下去,产生死锁。
(三)进程推进顺序不当引起死锁
上图中,如果按曲线1的顺序推进,两个进程可顺利完成;如果按曲线2的顺序推进,两个进程可顺利完成;如果按曲线3的顺序推进,两个进程可顺利完成;如果按曲线4的顺序推进,两个进程将进入不安全区D中,此时P1保持了资源R1,P2保持了资源R2,系统处于不安全状态,如果继续向前推进,则可能产生死锁。
死锁的定义、必要条件和处理方法
1、死锁的定义:如果一组进程(或线程)中的每一个进程(或线程)都在等待仅由该组进程中的其他进程(或线程)才能引发的事件,那么该组进程(或线程)是死锁的(Deadlock)。
2、产生死锁的必要条件
(1)互斥条件。进程(线程)所申请的资源在一段时间内只能被一个进程(线程)锁占用。
(2)请求和保持条件。进程(线程)已经占有至少一个资源,但又提出了新的资源请求,而该资源却被其他进程(线程)占用。
(3)不可抢占条件(不可剥夺条件)。进程(线程)已获得的资源在未使用完之前不能被抢占。
(4)循环等待条件(环路等待条件)。在发生死锁时,必然存在一个进程(线程)—-资源的循环链。
3、处理死锁的方法
(1)预防死锁。破坏死锁产生的必要条件中的一个或多个。注意,互斥条件不能被破坏,否则会造成结果的不可再现性。
(2)避免死锁。在资源分匹配过程中,防止系统进入不安全区域。
(3)检测死锁。通过检测机构检测死锁的发生,然后采取适当措施解除死锁。
(4)解除死锁。在检测机构检测死锁发生后,采取适当措施解除死锁。
利用银行家算法避免死锁
1、银行家算法中的数据结构
(1)可利用资源向量Available[m]。m为系统中的资源种类数,如果向量Available[j] = K,则表示系统中Rj类资源由K个。
(2)最大需求矩阵Max[n][m]。m为系统中的资源种类数,n为系统中正在运行的进程(线程)数,如果Max[i][j] = K,则表示进程i需要Rj类资源的最大数目为K个。
(3)分配矩阵Allocation[n][m]。m为系统中的资源种类数,n为系统中正在运行的进程(线程)数,如果Allocation[i][j] = K,则表示进程i当前已分得Rj类资源的数目为K个。
(4)需求矩阵Need[n][m]。m为系统中的资源种类数,n为系统中正在运行的进程(线程)数,如果Need[i][j] = K,则表示进程i还需要Rj类资源K个。
以上三个矩阵间的关系:
Need[i][j] = Max[i][j] - Allocation[i][j]
2、银行家算法
设Request( i)是进程Pi的请求向量,如果Request(i) [j] = K,表示进程Pi需要K个Rj类型的资源。
(1)如果Request(i) [j] <= Need[i][j],转向步骤(2)。
(2)如果Request(i) [j] <= Available[j] ,转向步骤(3)。
(3)系统尝试着把资源分给进程Pi。
Available[j] = Available[j] - Request(i) [j];
Allocation[i][j] = Allocation[i][j] + Request(i) [j];
Need[i][j] = Need[i][j] - Request(i) [j];
(4)系统执行安全性算法,检查此次资源分配后系统是否处于安全状态。
3、安全性算法
(1)设置两个向量:
1》工作向量Work[m],它表示系统可提供给进程继续运行所需要的各类资源数目,初始值Work = Available。
2》Finish:它表示系统是否有足够的资源分配给进程,使其运行完成。开始时Finish[i] = false,当有足够的资源分配给进程时Finish[i] = true。
(2)从进程(线程)集合中找到一个能满足下述条件的进程(线程)。
1》Finish[i] = false
2》Need[i][j] <= Work[j],如果找到转到步骤3》,没找到转到步骤4》。
3》Work[j] = Work[j] + Allocation[i][j] ;
Finish[i] = true;
go to step 2;
4》如果所有进程(线程)的Finish[i] = true都满足,表示系统处于安全状态,反之系统处于不安全状态。
https://blog.csdn.net/hj605635529/article/details/69214903
文章摘自计算机操作系统(第四版)杨小丹书籍