Python线程锁的实现

Python 线程锁的实现

Lock 的实现

锁只有两种状态,锁定或者未锁定

Lock = _allocate_lock

_allocate_lock = thread.allocate_lock

thread.allocate_lock 是用C代码实现的,代码位置 Python/thread_pthread.h

假设我们的系统支持 POSIX semaphores

首先看下 sem_init 的原型

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

pshared决定了这个信号量是在进程中共享还是在线程中共享。

  • pshared 为 非零值,那么不同进程中都可以共享
  • pshared 为 零值,那么在当前进程的线程中共享。

https://svn.python.org/projects/python/trunk/Python/thread_pthread.h

PyThread_type_lock
PyThread_allocate_lock(void)
{
    ...
    /* 申请内存 */
    lock = (sem_t *)malloc(sizeof(sem_t));

    if (lock) {
        /*
        初始化
        value 为1,表明这个锁是 unlocked,被该进程的所有线程共享
        */
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

Acquire

// waitflag 默认为 true
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    int success;
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;

    dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));

    do {
        if (waitflag)
            //默认执行到这里
            status = fix_status(sem_wait(thelock));
        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */

    if (waitflag) {
        CHECK_STATUS("sem_wait");
    } else if (status != EAGAIN) {
        CHECK_STATUS("sem_trywait");
    }

    success = (status == 0) ? 1 : 0;

    dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
    return success;
}

Release

void
PyThread_release_lock(PyThread_type_lock lock)
{
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;

    dprintf(("PyThread_release_lock(%p) called\n", lock));
    // sem_post 是关键,释放锁
    status = sem_post(thelock);
    CHECK_STATUS("sem_post");
}

RLock 的实现

RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire 可以被同一个线程(进程)多次无阻塞调用。但是 release 必须被匹配的使用。

下面可以看到 RLock 不过是一个浅包装

def RLock(*args, **kwargs):
    return _RLock(*args, **kwargs)

RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,

class _RLock():
    def __init__(self):
        # 内部使用的 一个锁
        self.__block = _allocate_lock()
        # __owner 用来保存 acquire 成功时的线程 id
        self.__owner = None
        # acquire被重复调用的次数
        self.__count = 0

python3 的实现

python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。


下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等

Condition 的实现

多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。

一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。

先看下如何使用

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

queue = []


def consumer(cv, q):
    logging.debug('Consumer thread started ...')
    while True:
        with cv:
            while not q:
                logging.debug("Nothing in queue, consumer is waiting")
                cv.wait()
            num = q.pop(0)
            logging.debug("Consumed %s", num)
            time.sleep(random.randint(1,3))


def producer(cv, q):
    logging.debug('Producer thread started ...')
    while True:
        with cv:
            nums = range(5)
            num = random.choice(nums)
            q.append(num)
            logging.debug("Produced %s", num)
            cv.notify_all()


if __name__ == '__main__':
    condition = threading.Condition()
    for i in range(10):
        threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start()
    pd = threading.Thread(name='producer', target=producer, args=(condition, queue))
    pd.start()

下面看如何实现

class _Condition:
    def __init__(self, lock=None, verbose=None):
        # 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法
        self.acquire = lock.acquire
        self.release = lock.release
        # 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的
        #......
        # 这个很重要,保存了等待在这个Condition上的信息
        self.__waiters = []

下面看 wait方法,为了篇幅,省略了部分代码

    def wait(self, timeout=None):
        # 必须先成功调用acquire方法,才能调用wait
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        # 生成一个锁,并调用 acquire,使得它处于 locked 状态
        # 这个锁代表一个waiter
        waiter = _allocate_lock()
        waiter.acquire()
        # 保存起来
        self.__waiters.append(waiter)
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 再次调用 acquire 方法,等待锁被释放
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                # 。。。。。。
        finally:
            # 必须恢复锁原来的状态,这个方法很重要
            self._acquire_restore(saved_state)

再看下 notify方法

    def notify(self, n=1):
        # 同样,必须得调用 acquire成功,才可以调用本方法
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")

        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")
        for waiter in waiters:
            # 调用 锁上的 release 方法,使得等待者可以继续
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

寻工作

本人正在找工作。地点深圳。请联系我微信 sunfriend

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容

  • 线程 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0....
    不浪漫的浪漫_ea03阅读 362评论 0 0
  • 概述 这篇博客是我翻译Python threads synchronization: Locks, RLocks,...
    0行痴0阅读 1,455评论 0 8
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,257评论 4 56
  • 你爱这座城市街道的繁华, 也恨它拥挤的交通。 你爱它绚丽的灯光, 也恨它孤独的夜晚。 . 你爱这座城市的每一个舞台...
    刘海峰6阅读 1,576评论 1 2
  • 那天,我收到高中同学的邀请,去某某KTV,我记得我高中的时候没什么朋友啊,本身也不是招人喜欢的类型。虽然有些疑惑...
    BF丶凌碎阅读 443评论 0 0