xv6(7)锁LOCK锁

锁LOCK锁

锁,大家应该很熟悉了,用来避免竞争,实现同步。本文以 xv6 为例来讲解锁本身是怎么实现的,废话不多说先来看一些需要了解的概念:

一些概念

公共资源:顾名思义就是被多个任务共享的资源,可以是公共内存,也可以是公共文件等等

临界区: 要访问使用公共资源,肯定得通过一些代码指令去访问,这些代码指令就是临界区

并发:单个 CPU 上交替处理多个任务,宏观上看就像是同时进行的一样,但微观上看任意时刻还是只有一个任务在进行。

并行:多个处理器同时处理多个任务,能够做到真正意义上的多个任务同时进行。

互斥:也称为排他,任何时候公共资源只允许最多一个任务独享,不允许多个任务同时执行临界区的代码访问公共资源。

竞争条件:竞争条件指的是多个任务以竞争的形式并行访问公共资源,公共资源的最终状态取决于这些任务的临界区代码的精确执行时序。

显然竞争条件并不是我们想要的,虽然一些竞争条件出现的概率很小,但根据墨菲定律,会出错的总会出错,加之计算机的运行频率,就算出错的概率再小,在某天某时某刻那也是有可能发生的。

所以对于进入临界区访问公共资源我们要避免竞争条件,保证公共资源的互斥排他性,一般有两种大的解决方案来实现互斥:

  • 忙等待:没进入临界区时一直循环,占用 CPU 资源
  • 休眠等待:没进入临界区时一直休眠,不占用 CPU,CPU 利用率较高,但有进程上下文切换的开销

那如何知道临界区能不能进,公共资源能不能访问,总得有个测试的东西,好让进程知晓现在是否进入临界区访问公共资源,这个用来测试的东西就是锁。根据上面两种大的解决方案,xv6 实现了两种锁,自旋锁和休眠锁,下面来仔细看看:

自旋锁

结构定义

struct spinlock {
  uint locked;       // Is the lock held? 该锁是否被锁上了

  // For debugging:
  char *name;        // Name of lock. 名字
  struct cpu *cpu;   // The cpu holding the lock. 哪个CPU取了锁
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock. 调用栈信息
};

有关自旋锁的结构定义如上,最重要的就是 locked 元素,用来表示该锁是否已被某 CPU 取得,1 表示该锁已某 CPU 取走,0 表示该锁空闲。其他三个元素用作为 debug 调试信息,后面用到具体再看。

相关函数

初始化

void initlock(struct spinlock *lk, char *name)  //初始化锁 lk
{
  lk->name = name;   //初始化该锁的名字
  lk->locked = 0;    //初始化该锁空闲
  lk->cpu = 0;       //初始化持有该锁的CPU为空
}

这部分就是初始化锁的名字,空闲,没有 CPU 持有该锁

"开关"中断

void pushcli(void)
{
  int eflags;
  eflags = readeflags();   //cli之前读取eflags寄存器
    
  cli();    //关中断
  if(mycpu()->ncli == 0)   //第一次pushcli()
    mycpu()->intena = eflags & FL_IF;  //记录cli之前的中断状态
  mycpu()->ncli += 1;      //关中断次数(深度)加1
}

void popcli(void)
{
  if(readeflags()&FL_IF)   //如果eflags寄存器IF位为1
    panic("popcli - interruptible");
  if(--mycpu()->ncli < 0)   //如果计数小于0
    panic("popcli");
  if(mycpu()->ncli == 0 && mycpu()->intena)  //关中断次数为0时即开中断
    sti();
}

pushcli 和 popcli 为 cli 和 sti 的封装函数,只是增加了计数,每个 popcli 与 pushcli 匹配,有多少次 pushcli 就要有多少次 popcli。

每次 pushcli 都要先读取 eflags 寄存器,然后 cli 关中断,这是因为如果是第一次 pushcli 的话需要记录在 cli 之前的中断状态到 cpu 结构体 intena 字段。

除了第一次 pushcli 和最后一次 popcli,pushcli 就是将深度计数 ncli 加 1,popcli 就是将 ncli 减 1。

每次 pushcli 都调用了 cli 来关中断,但其实只有第一次 pushcli 有实际关中断的效果,其实也不然,如果第一次 pushcli 之前本来就处于关中断,那么第一次的 pushcli 的 cli 也无甚作用。

但是 popcli 开中断的时机必须是 最后一个 popcli 也就是计数为 0,以及第一次 pushcli 之前的中断状态为允许中断,只有两者都满足时才能开中断。

为什么使用 pushcli() 和 popcli() 而不是使用 cli() sti() 后面详细说明。

取锁解锁

int holding(struct spinlock *lock)
{
  int r;
  pushcli();   
  r = lock->locked && lock->cpu == mycpu(); //检验锁lock是否被某CPU锁持有且上锁
  popcli();
  return r;
}

关中断下检查锁是否被某 CPU 取走,仔细看检查是否持有锁的条件为两个:一是锁是否被取走,二锁是否由当前 CPU 取走。

void acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))   // 如果已经取了锁
    panic("acquire");

  while(xchg(&lk->locked, 1) != 0)   //原子赋值
    ;
  __sync_synchronize();   //发出一个full barrier

  // 调试信息
  lk->cpu = mycpu();     //记录当前取锁的CPU
  getcallerpcs(&lk, lk->pcs);   //获取调用栈信息
}

关中断下进行取锁的操作,以避免死锁,原因见后面 FAQ

检查当前 CPU 是否已经持有锁,如果已持有,则 panic(),也就是说 xv6 不允许同一个 CPU 对同一个锁重复上锁。

上锁的语句为 while(xchg(\&lk \rightarrow locked, 1) != 0); xchg 函数可以看作一个原子赋值函数(本是交换,与 1 交换就相当于赋值,详见 内联汇编@@@@@),将 \&lk \rightarrow locked 赋值为 1,返回 \&lk \rightarrow locked 的旧值。也就是说如果该锁空闲没有 CPU 持有,那么当前 CPU 将其赋值为 1 表示取得该锁,xchg 返回旧值 0,跳出 while 循环。如果该锁已经被某 CPU 持有,那么 xchg 对其赋值为 1,但返回值也是 1,不满足循环跳出条件,所以一直循环等待某 CPU 释放该锁。因取锁可能需要一直循环等待,所以名为自旋锁。

__sync_synchronize() 是发出一个 full barrier,简单来说就是不允许将这条语句之前的内存读写指令放在这条之后,也不允许将这条语句之后的内存读写指令放在这条指令之前。为啥要放个屏障在这?按照正常的逻辑思维应该是该 CPU 获取到了锁才对该锁的一些 debug 信息做记录的对吧,如果不加屏障,顺序就可能颠倒。

memory barrier有几种类型:

  • acquire barrier : 不允许将barrier之后的内存读取指令移到barrier之前(linux kernel中的wmb())。
  • release barrier : 不允许将barrier之前的内存读取指令移到barrier之后 (linux kernel中的rmb())。
  • full barrier : 以上两种barrier的合集(linux kernel中的mb())。
void release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");
  
  lk->pcs[0] = 0;  //清除调试信息
  lk->cpu = 0;

  __sync_synchronize(); //发出一个full barrier

  asm volatile("movl $0, %0" : "+m" (lk->locked) : ); //lk->locked=0

  popcli();
}

解锁锁,基本就是上锁锁的逆操作了,释放锁和清除调试信息,有关内联汇编的同样还是看前文,本文不赘述。

调试信息

其他的像记录 cpu 都好说,主要是来看这个 getcallerpcs() 函数,记录调用栈的信息:

void getcallerpcs(void *v, uint pcs[])
{
  uint *ebp;
  int i;

  ebp = (uint*)v - 2;  //getcallerpcs的调用者的调用者的ebp地址
  for(i = 0; i < 10; i++){
    if(ebp == 0 || ebp < (uint*)KERNBASE || ebp == (uint*)0xffffffff) //停止条件
      break;
    pcs[i] = ebp[1];     // saved %eip
    ebp = (uint*)ebp[0]; // saved %ebp
  }
  for(; i < 10; i++)
    pcs[i] = 0;
}

pc[] 到底是个啥玩意儿?每次调用函数使用 call 指令的时候都会把 call 指令的下一条指令压栈,pc[] 就是存放的是这个返回地址,来看看是怎么实现的。

首先要知道函数调用的几个规则:

  • call 指令调用函数之前要先将参数压栈,方向为从右至左,先压最后一个参数,最后压第一个参数
  • call x,将 x 赋给 eip, 将下一条指令的地址压入栈中
  • 进入函数时先 push %ebp,再 movl %esp, %ebp 形成新的栈帧

所以栈中的情况大致应该是这样的:

[图片上传失败...(image-9e9fc5-1706604679726)]

每个被调用者形成的栈帧底部都是保存的调用者栈帧的 ebp,而被调用者的 ebp 指向它,所以其实各个栈帧就像是用 ebp 给串起来的,各个栈帧好比形成了一条链,每个栈帧就是一个结点,指针就是 ebp

要理清 getcallerpcs() 函数,不能只看一层调用,至少要看两层调用,举个例子:函数 A 调用 acquire(plk),acquire(plk) 调用 getcallerpcs(&plk, plk->pcs),这里我用 plk 来表示锁,只是想说明它是个指针是个地址,所以 getcallerpcs() 中的 &plk,是个二级指针,这也是这个函数能够得以实现的关键点之一,来看看栈帧什么样子的:

[图片上传失败...(image-f689f4-1706604679726)]

所以 (uint*)v-2 实际上就是 \&plk-2 也就是 acquire() 栈帧中保存的 A 函数的 ebp 的地址,有点绕,看看图应该能明白。ebp+4 是返回地址,填进 pc[] 中去,ebp=(uint*)ebp[0] 移到上上一个栈帧中去,跳过了 acquire 的栈帧。

另外有三个停止条件,第一个 ebp==0,ebp = 0 就表示后面没有调用栈帧了,但是关于这个条件我在 xv6 里面没有找到明确将 ebp 赋值 为 0 的语句,而在 jos 的 entry.S 里面是有明确的 ebp = 0 这条语句,而且还附有注释,就像我上述说的为了标识后面为空栈了。对此可能是 xv6 的一个小 bug 吧,补上就行。第二个条件 ebp 值不能在内核之下即处于用户态,getcallerpcs 的调用者,调用者的调用者都是运行在内核,所以应不会处于用户态的低地址。第三个条件 ebp==0xffffffff,最初我以为是 exec 函数调用时塞给用户栈的那个无效返回地址,但转念一想不对,getcallerpcs 运行在内核,不会与用户栈那个无效返回地址挂上钩,检测到用户态的地址时肯定会先从第二个条件跳出去。所以对此我认为,越是外层的调用栈帧处于更高的地址,0xffffffff 就是单纯地表示极限顶点,但是实际是不可能跑那么高的地址去的。

FAQ

基本函数说完,来聊聊一些遗留问题:

xv6 的竞争条件有哪些?

xv6 是个支持多处理器的系统,各个 CPU 之间可以并行执行,所以可能会出现同时访问公共资源的情况。

在单个 CPU 上,中断也可能导致并发,在允许中断时,内核代码可能在任何时候停下来,然后执行中断处理程序,内核代码和中断处理程序交叉访问公共资源也可能导致错误。所以在取锁检验锁都要在关中断下进行。

另外 xv6 不支持线程,而各个进程之间内存是不共享的,加之内核进入临界区访问公共资源的时候是关了中断的,关了中断除了自己休眠是不会让出 CPU 的,所以运行在单个处理器上的各个进程之间的并发其实并不会产生竞争条件

为什么要使用 xchg()?

试想不使用 xchg() 来实现应该怎么实现?下面代码由 xv6 文档给出:

for(;;) {
   if(!lk->locked) {    //如果没上锁
       lk->locked = 1;  //上锁
       break;     //跳出
   }
}

这种实现方式,访问 lk \rightarrow locked 和修改分为了两步,可能会出现这样的状况:CPU1 和 CPU2 接连执行到上述的 if 语句,两者都发现 lk \rightarrow locked = 0,没来得及修改 lk \rightarrow locked 来组织对方访问,那么两者都执行 lk \rightarrow locked = 1 拿到了锁,这就违反了互斥的原则。

归根揭底的原因就是因为访问和修改是分开的两步动作,而 xchg 将两者合为一个原子操作,将 lk \rightarrow locked 与 1 原子交换并返回 lk \rightarrow locked 的旧值则避免了上述问题。

acquire() 函数为什么要关中断,或者说先关中断再上锁?

对于为什么要关中断前面竞争条件简单说过,这里从死锁的角度来看,假如两者交换先上锁再关中断或者直接不关中断,若有 A 调用 acquire() 想要获得锁,当它拿到锁时刚好发生中断,中断处理程序也想要获得该锁,但 A 已经换下 CPU 了,肯定释放不了啊,那么就产生死锁。所以要先关中断再上锁。

release() 函数先原子赋值释放锁再开中断,也就同理了,如果两者交换先开中断,那么在释放锁之前可能发生中断,而中断处理程序刚好需要该锁,那么发生死锁。

关中断开中断为什么要使用 pushcli() 和 popcli() 而不直接使用 cli() 和 sti()?

前面我们已经知道如果在 CPU 持有锁的阶段发生中断,中断服务程序可能也要取锁,那么就会死锁,所以 xv6 直接决定在取锁的时候就关中断,CPU 持有锁的整个阶段都处于关中断,只有释放锁的时候才可能开中断,注意这里是可能,因为用的是 popcli()。

那么正题就来了,为什么要使用 pushcli() 和 popcli(),其实也简单,那是因为某个函数中可能要取多个锁,比如先后取了锁 1 锁 2,那么释放锁 2 之后能开中断吗?显然不能,必须等锁 1 也释放的时候才能开中断。所以使用增加了计数功能的 pushcli() 和 popcli() 来实现最后一个锁释放的时候才开中断。

指令乱序问题

现今的指令的执行都有流水线的技术,其实还有乱序执行。乱序执行指的是在 CPU 运行中的指令不按照代码既定的顺序执行,而是按照一定的策略打乱后顺序执行,以此来提高性能

不是所有的指令序列都可以打乱,没有关系的指令之间才可以打乱。这种乱序执行的确在一定程度上可以提高性能,但也会暴露一些问题,比如前面涉及开关中断的指令,对于我们人来说,这种指令与它相邻的一些指令之间肯定是有逻辑联系的,但 CPU 不知道啊,CPU 判断指令之间是否有关系只能凭借简单的逻辑,比如 B 指令需要 A 指令的结果。但是像上面那种复杂的逻辑关系它是不能判断的就可能将指令顺序错误的打乱,为避免这种情况,我们设置了屏障,禁止这个屏障前后的指令顺序打乱。

休眠锁

xv6 里面还提供了另一种锁,休眠锁,它在自旋锁的基础之上实现,定义如下:

struct sleeplock {
  uint locked;       // Is the lock held? 已锁?
  struct spinlock lk; // spinlock protecting this sleep lock 自旋锁
  
  // For debugging:
  char *name;        // Name of lock. 名字
  int pid;           // Process holding lock 哪个进程持有?
};

休眠锁配了一把自旋锁来保护,原因见后。休眠锁的意思是某个进程为了取这个休眠锁不得而休眠,所以有个 pid 来记录进程号

相关函数

休眠锁的初始化,检验是否持有锁等都类似,就不赘述了,再这里主要看看取锁和解锁的区别:

void acquiresleep(struct sleeplock *lk)
{
  acquire(&lk->lk);        //优先级:->高于&
    
  while (lk->locked) {    //当锁已被其他进程取走
    sleep(lk, &lk->lk);   //休眠
  }
  lk->locked = 1;        //上锁
  lk->pid = myproc()->pid;  //取锁进程的pid
    
  release(&lk->lk);       
}

void releasesleep(struct sleeplock *lk)
{
  acquire(&lk->lk);  //取自旋锁
  lk->locked = 0;
  lk->pid = 0;
  wakeup(lk);        //唤醒
  release(&lk->lk);
}

休眠锁要使用自旋锁来保护有两点原因,首先休眠锁的自旋锁可以使得对休眠锁的本身属性字段的操作为一个原子操作,是为保护。

另外一个原因涉及到进程的休眠唤醒,在这里先提前说说。sleep(&obj, (&obj)->lock);,这是 sleep 函数的原型,如果一个进程想要获取一个对象 obj 而不得,那么这个进程就会休眠在对象 obj 上面,有休眠就有唤醒,而这个 obj.lock 主要就是为了避免错过唤醒。这部分我会在进程那一块儿详细讲述,这里过过眼,灌灌耳音。

当前进程想要获取休眠锁,这个休眠锁就是对象,如果被别的进程取走的话,那么当前进程就取而不得,休眠在休眠锁这个对象上。如果取到了该休眠锁,就将 locked 置为 1,记录取得该锁的进程号。

解锁操作基本上就是上锁的逆操作,注意一点,可能有其他进程休眠在休眠锁上,所以当前进程解锁后需要唤醒休眠在休眠锁上的进程。

好了本节就这样吧,有什么问题还请批评指正,也欢迎大家来同我讨论交流学习进步。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容

  • 厚脸皮引流 厚脸皮引流 自旋锁是一个很神奇的东西,一个介于高效和低效之间的一个 『薛定谔』😯的互斥机制。 自旋锁的...
    gunjianpan阅读 1,898评论 0 3
  • 中断理论部分 中断是硬件和软件交互的一种机制,可以说整个操作系统,整个架构都是由中断来驱动的。中断的机制分为两种,...
    Rand_CS阅读 117评论 0 1
  • 包括 xv6 在内的大多数内核,都在交错执行多个任务。交错执行的一种原因是多处理器硬件:多 CPU 独立执行,例如...
    sarto阅读 673评论 0 0
  • 中断代码部分 本文来说码,看看中断到底是个啥样,同前面启动先来看看总图: 另外我说明一下我画的流程图啊,的确是不标...
    Rand_CS阅读 59评论 0 1
  • 启动理论部分 本节来说说捋清启动需要知道的一些东西,因知识点的确很多,涉及了各个方面,我就不像其他章节一样各个部分...
    Rand_cs1阅读 160评论 0 1