Python线程锁的实现

Python 线程锁的实现

Lock 的实现

锁只有两种状态,锁定或者未锁定

Lock = _allocate_lock

_allocate_lock = thread.allocate_lock

thread.allocate_lock 是用C代码实现的,代码位置 Python/thread_pthread.h

假设我们的系统支持 POSIX semaphores

首先看下 sem_init 的原型

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

pshared决定了这个信号量是在进程中共享还是在线程中共享。

  • pshared 为 非零值,那么不同进程中都可以共享
  • pshared 为 零值,那么在当前进程的线程中共享。

https://svn.python.org/projects/python/trunk/Python/thread_pthread.h

PyThread_type_lock
PyThread_allocate_lock(void)
{
    ...
    /* 申请内存 */
    lock = (sem_t *)malloc(sizeof(sem_t));

    if (lock) {
        /*
        初始化
        value 为1,表明这个锁是 unlocked,被该进程的所有线程共享
        */
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

Acquire

// waitflag 默认为 true
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    int success;
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;

    dprintf(("PyThread_acquire_lock(%p, %d) called\n", lock, waitflag));

    do {
        if (waitflag)
            //默认执行到这里
            status = fix_status(sem_wait(thelock));
        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */

    if (waitflag) {
        CHECK_STATUS("sem_wait");
    } else if (status != EAGAIN) {
        CHECK_STATUS("sem_trywait");
    }

    success = (status == 0) ? 1 : 0;

    dprintf(("PyThread_acquire_lock(%p, %d) -> %d\n", lock, waitflag, success));
    return success;
}

Release

void
PyThread_release_lock(PyThread_type_lock lock)
{
    sem_t *thelock = (sem_t *)lock;
    int status, error = 0;

    dprintf(("PyThread_release_lock(%p) called\n", lock));
    // sem_post 是关键,释放锁
    status = sem_post(thelock);
    CHECK_STATUS("sem_post");
}

RLock 的实现

RLock表示的是 reentrant lock,如果该锁已经被获取,那么acquire 可以被同一个线程(进程)多次无阻塞调用。但是 release 必须被匹配的使用。

下面可以看到 RLock 不过是一个浅包装

def RLock(*args, **kwargs):
    return _RLock(*args, **kwargs)

RLock 内部保存了一个普通的锁(thread.allocate_lock 生成),同时保存了 这个锁的 owner,

class _RLock():
    def __init__(self):
        # 内部使用的 一个锁
        self.__block = _allocate_lock()
        # __owner 用来保存 acquire 成功时的线程 id
        self.__owner = None
        # acquire被重复调用的次数
        self.__count = 0

python3 的实现

python3 会判断系统是否支持 reentrant lock,如果支持则用系统的,否则用 python 代码实现一个。


下面我们将看到,如何只用一个 Lock来实现其他的同步机制, Condition, Event, Semaphore等

Condition 的实现

多个线程可以用 condition 来等待同一个事件的发生,当一个事件发生后,所有等待的线程都可以得到通知。

一个 Condition 总是和一个锁关联在一起的。可以传递一个锁,也可以由 构造函数自己创建一个。

先看下如何使用

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

queue = []


def consumer(cv, q):
    logging.debug('Consumer thread started ...')
    while True:
        with cv:
            while not q:
                logging.debug("Nothing in queue, consumer is waiting")
                cv.wait()
            num = q.pop(0)
            logging.debug("Consumed %s", num)
            time.sleep(random.randint(1,3))


def producer(cv, q):
    logging.debug('Producer thread started ...')
    while True:
        with cv:
            nums = range(5)
            num = random.choice(nums)
            q.append(num)
            logging.debug("Produced %s", num)
            cv.notify_all()


if __name__ == '__main__':
    condition = threading.Condition()
    for i in range(10):
        threading.Thread(name='consumer%s' % i, target=consumer, args=(condition, queue)).start()
    pd = threading.Thread(name='producer', target=producer, args=(condition, queue))
    pd.start()

下面看如何实现

class _Condition:
    def __init__(self, lock=None, verbose=None):
        # 必须关联一个 Lock,如果没有的话,则自己创建一个 RLock
        if lock is None:
            lock = RLock()
        self.__lock = lock
        # 可以在 Condition上调用 acquire() and release() 方法,实际是调用的是内部锁的方法
        self.acquire = lock.acquire
        self.release = lock.release
        # 如果锁定义了 _release_save _acquire_restore _is_owned 方法,那么使用之,否则用自己定义的
        #......
        # 这个很重要,保存了等待在这个Condition上的信息
        self.__waiters = []

下面看 wait方法,为了篇幅,省略了部分代码

    def wait(self, timeout=None):
        # 必须先成功调用acquire方法,才能调用wait
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        # 生成一个锁,并调用 acquire,使得它处于 locked 状态
        # 这个锁代表一个waiter
        waiter = _allocate_lock()
        waiter.acquire()
        # 保存起来
        self.__waiters.append(waiter)
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 再次调用 acquire 方法,等待锁被释放
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                # 。。。。。。
        finally:
            # 必须恢复锁原来的状态,这个方法很重要
            self._acquire_restore(saved_state)

再看下 notify方法

    def notify(self, n=1):
        # 同样,必须得调用 acquire成功,才可以调用本方法
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")

        __waiters = self.__waiters
        waiters = __waiters[:n]
        if not waiters:
            if __debug__:
                self._note("%s.notify(): no waiters", self)
            return
        self._note("%s.notify(): notifying %d waiter%s", self, n,
                   n!=1 and "s" or "")
        for waiter in waiters:
            # 调用 锁上的 release 方法,使得等待者可以继续
            waiter.release()
            try:
                __waiters.remove(waiter)
            except ValueError:
                pass

寻工作

本人正在找工作。地点深圳。请联系我微信 sunfriend

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 线程 引言&动机 考虑一下这个场景,我们有10000条数据需要处理,处理每条数据需要花费1秒,但读取数据只需要0....
    不浪漫的浪漫_ea03阅读 2,865评论 0 0
  • 概述 这篇博客是我翻译Python threads synchronization: Locks, RLocks,...
    0行痴0阅读 5,317评论 0 8
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 13,872评论 4 56
  • 你爱这座城市街道的繁华, 也恨它拥挤的交通。 你爱它绚丽的灯光, 也恨它孤独的夜晚。 . 你爱这座城市的每一个舞台...
    刘海峰6阅读 5,435评论 1 2
  • 那天,我收到高中同学的邀请,去某某KTV,我记得我高中的时候没什么朋友啊,本身也不是招人喜欢的类型。虽然有些疑惑...
    BF丶凌碎阅读 3,294评论 0 0