Python 多线程笔记

Python 多线程笔记

创建线程

1. 使用函数创建多线程

from threading import Thread

# 创建一个需要多线程执行的函数
def hell(name='Python'):
    for i in range(2):
        print('Hello', name)
        time.sleep(1)

# 创建线程
thread_01 = Thread(target=hell)

# 启动线程
thread_01.start()

2. 使用类创建多线程

  1. 继承 Thread
  2. 重写 run() 方法
import time
from threading import Thread

# 继承 Thread
class MyThread(Thread):
    def __init__(self, id=1):
        super().__init__()  # 固定写法 
        self.name = f'thread-{id}'
    
    # 重写 run 方法
    def run(self):
        for i in range(2):
            print('hello, ', self.name)
            time.sleep(1)

# 获取线程对象
thread_1 = MyThread(1)

# 启动线程
thread_1.start()

Lock(锁) 机制

锁的常用方法

import threading

# 生成锁对象,全局唯一
lock = threading.Lock()

# 获取锁,获取到锁才能执行(只有一人能获取到锁)
lock.acquire()

# 释放锁,释放状态下的锁才能被获取
lock.release()

提示:获取锁后,一定要记得释放锁,不然会造成死锁的问题

with 自动获取释放锁(推介用法)

lock = threading.Lock()

# 执行 with 代码块中的代码是会自动获取锁,执行后会自动释放锁
with lock:
    # 业务逻辑
    pass

RLock(可重入锁)

示例

import threading
lock = threading.Lock()

# 获取锁
lock.acquire()
# out: True

# 再次获取锁
lock.acquire() # 产生死锁(因为锁已经被获取,所以会阻塞,直至锁被释放,又因为获取锁的就是此线程,该线程又被阻塞了,锁不可能被释放,产生死锁)

如果是 RLock 就不会被阻塞

import threading
rlock = threading.RLock()

rlock.acquire()
# out: True

rlock.acquire() # 并不会阻塞
# out: True

Rlock 就是为了解决了上面的问题而生的, RLock是可重入锁(reentrant lock,acquire()能够不被阻塞的被同一个线程调用多次。

RLock 部分源码

# 底层的获取锁方法
_allocate_lock = _thread.allocate_lock
get_ident = _thread.get_ident

class _RLock:

    def __init__(self):
        # 获取锁对象
        self._block = _allocate_lock()
        self._owner = None
        self._count = 0

    def acquire(self, blocking=True, timeout=-1):
        # 获取当前线程表示当前线程的数字
        me = get_ident()

        # 如果锁的拥有者是当前线程(同一线程重复获取锁)
        if self._owner == me:
            # 获取锁计数 +1
            self._count += 1
            return 1
        
        # 第一次获取锁
        # 通过锁对象获取锁
        rc = self._block.acquire(blocking, timeout)
        if rc:
            # 锁拥有者标记为当前线程
            self._owner = me
            self._count = 1
        return rc


    def release(self):
        # 锁已被获得,但不在本线程上
        if self._owner != get_ident():
            raise RuntimeError("cannot release un-acquired lock")
        
        # 获取锁计数 -1
        self._count = count = self._count - 1
        # 如果锁计数为 0,把锁拥有者置为 None,并释放锁
        if not count:
            self._owner = None
            self._block.release()

第一次使用 acquire() 方法时会真正的获取一把锁(锁没有被别的线程获取时),并把获取锁的计数+ 1,如果再获取锁,Rlock 就只增加获取的计数,

每次调用 release() 方法都会将锁计数 -1,如果锁计数为0时才会真正的释放锁。

_thread.allocate_lock 方法

_thread.allocate_lock(blocking=True,timeout=-1)

    • 以阻塞或非阻塞的方式获取一个锁
    • 当 blocking 参数为 True,调用者的线程会阻塞直到锁转为“unlocked”状态
    • 当 blocking 参数为 False 时,则不会阻塞。如果没有获取到锁则会立即返回一个 False,并继续执行线程
    • 当 timeout 参数为一个正的浮点数时,若发生阻塞,则至多阻塞 timeout 秒;若为 –1 (默认值),则表示会一直阻塞下去。

Condition

常用方法

import threading

# 获取 Condition 对象
cond = threading.Condition()

# 获取锁对象,跟 Lock.acquire() 类似
cond.acquire()

# 释放锁对象,跟 Lock.release() 类似
cond.release()

# 阻塞,直到 cond.notify() 方法被调用
cond.wait()

# 唤醒阻塞(前提需要获取锁)
cond.notify()

**提示:使用 cond.wait() 或者 cond.notify() 之前需要调用 cond.acquire() 方法,执行完成后记得释放锁,也就是执行 cond.release()方法 **

使用示例

import threading
import time


class Hide(threading.Thread):

    def __init__(self, cond, name):
        super().__init__()
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)
        self.cond.acquire()  # 获取锁

        print(f'{self.name}: 我已经把眼睛蒙上了')
        self.cond.notify()

        self.cond.wait()
        print(f'{self.name}: 我找到你了 ~_~')
        self.cond.notify()

        self.cond.wait()
        print(f'{self.name}: 我赢了')
        self.cond.release


class Seeker(threading.Thread):

    def __init__(self, cond, name):
        super().__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()

        self.cond.wait()
        print(f'{self.name}: 我已经藏好了,你快来找我吧')
        self.cond.notify()

        self.cond.wait()
        print(f'{self.name}: 被你找到了,哎~~~')
        self.cond.notify()
        self.cond.release()


if __name__ == '__main__':
    cond = threading.Condition()
    cond.release()
    hide = Hide(cond, '小明')
    seeker = Seeker(cond, '小红')

    hide.start()
    seeker.start()

    hide.join()
    seeker.join()

event 事件

threading.Event 是 Python 中的简单线程通信机制,它对Condition进行了封装,它的功能跟 Condition 类似可以设置多个线程等待某个事件发生,当时间发生后,线程才会被激活执行。

常用的三个方法

import threading

# 获取一个事件对象
event = threading.Event()

# 重置事件
event.clear()

# 阻塞,直至 event.set() 方法执行
event.wait()

# 唤醒阻塞线程
event.set()

Event 源码

    # set 方法将内部标志设置为 True
    def set(self):
        with self._cond:
            self._flag = True
            self._cond.notify_all()

    # clear 方法将内部标志设置为 False
    def clear(self):
        with self._cond:
            self._flag = False

    # 如果是 True 之间返回,否者阻塞,直至超时
    def wait(self, timeout=None):
        with self._cond:
            signaled = self._flag
            if not signaled:
                signaled = self._cond.wait(timeout)
            return signaled

从上面可以看出,wait() 方法会根据内部的 _flag状态决定是否阻塞,如果 _flag == False 就会阻塞,反之则不会阻塞,而 set() 方法会将 _flag 设置成 Trueclear() 方法会将 _flag 设置成 False

Queue

queue 模块提供适用于多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,所以多个线程很轻松地使用同一个实例。

with 语句

Queue 源码中有大量的 with self.xxx: 的用法,也就是说想要看懂 Queue 的源码首先得清楚 with 这一语句的作用是什么。

首先要明白几个概念:

  1. 上下文管理协议(Context Management Protocol):在 Python 类如果想要支持该协议需要实现 __enter__()__exit__() 这两个方法。

  2. 上下文管理器(Context Manager): 上下文管理器要负责对支持上下文管理协议的对象,进入指定代码块之前执行指定操作(__enter__() 方法),离开指定代码块后执行指定的操作(__exit__()方法

whit 是 Python 中使用最广泛的上下文管理器,会非常适合需要执行成对的操作,如:打开/关闭文件,获取/释放锁,

class Queue:

    def __init__(self, maxsize=0):
        # 省略
        self.mutex = threading.Lock()
        # 获取 Condition 对象
        self.not_empty = threading.Condition(self.mutex)
        # 省略
        
    def get(self, block=True, timeout=None):
        # 执行前会调用 condition 的 __enter__ 方法
        # 执行完毕会调用 condition 的 __exit__ 方法
        with self.not_empty:
            # 省略

Condition

class Condition:
    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()
        self._lock = lock
 
    def __enter__(self):
        # 调用的是 Lock 对象的 __enter__() 方法
        return self._lock.__enter__()

    def __exit__(self, *args):
        # 调用的是 Lock 对象的 __exit__() 方法
        return self._lock.__exit__(*args)

Lock

class Lock:
    # 获取锁
    __enter__ = acquire
    # 释放锁
    def __exit__(self, t, v, tb):
        self.release()

Queue 源码解析

'''A multi-producer, multi-consumer queue.'''

import threading
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time

# 模块导入限制,当前文件被导入时只能导入 __all__ 内指定的属性、方法、类
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']


class Empty(Exception):
    pass


class Full(Exception):
    pass


class Queue:

    def __init__(self, maxsize=0):
        # 设置队列最大容量,0 表示无限容量
        self.maxsize = maxsize
        self._init(maxsize)

        # 获取锁对象(互斥锁)
        self.mutex = threading.Lock()

        # 使用锁对象,衍生的三个条件变量
        # 所有 with self.这三个变量: 的代码块都是互斥的,只可能有一个执行中
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)

        self.unfinished_tasks = 0

    # 表示处理完了元素,get 仅仅是表示获取到了元素
    def task_done(self):
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    # 阻塞,直到所有元素处理完成
    def join(self):
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self):
        """ 获取队列中的元素数量 """
        with self.mutex:
            return self._qsize()

    def empty(self):
        """ 检查队列是否为空 """
        with self.mutex:
            return not self._qsize()

    def full(self):
        """ 检查队列是否已满 """
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()
    
    # block 表示是否阻塞
    # timeout 表示超时时间,没有指定则无限等待
    def put(self, item, block=True, timeout=None):
        """ 添加元素到队列 """

        with self.not_full:
            # 检查是否设置了队列容量
            if self.maxsize > 0:
                if not block:
                    # 检查容量是否已满
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        # 已满,阻塞,等待队列任务被消费
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    # 如果队列已满,等待指定的时间,如果还是满的,抛出异常
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        # 阻塞指定时间
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()  # 唤醒 get() 方法的阻塞


    # block 表示是否阻塞
    # timeout 表示超时时间,没有指定则无限等待
    def get(self, block=True, timeout=None):
        """ 消费队列元素 """
        
        with self.not_empty:
            # 如果不等待,队列是空的直接抛异常
            if not block:
                if not self._qsize():
                    raise Empty
            # 没有限制时间,等待元素加入队列
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            # 指定了等待时间,且队列是空的,等待指定之间后还是空的,抛出异常
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    # 阻塞指定时间
                    self.not_empty.wait(remaining)
            item = self._get()
            # 注意:这里并有 unfinished_tasks - 1,get 仅仅表示获取到了元素,并不代表处理完 
            self.not_full.notify()  # 唤醒 put() 方法的阻塞
            return item

    def put_nowait(self, item):
        '''Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        '''
        return self.put(item, block=False)

    def get_nowait(self):
        '''Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        '''
        return self.get(block=False)

    # 下面的方法是为了方便重写

    # 初始化队列
    def _init(self, maxsize):
        self.queue = deque()

    # 获取队列元素数量
    def _qsize(self):
        return len(self.queue)

    # 追加元素
    def _put(self, item):
        self.queue.append(item)

    # 获取第一个元素
    def _get(self):
        return self.queue.popleft()

使用示例

from queue import Queue
from threading import Thread
import time


class Student(Thread):

    def __init__(self, name, queue):
        super().__init__()
        self.name = name
        self.queue = queue

    def run(self):
        while True:
            # 阻塞,等待消息
            msg = self.queue.get()
            if msg == self.name:
                print(f'{self.name}: 到!')
                return


class Teacher:

    def __init__(self, queue):
        self.queue = queue

    def call(self, student_name):
        print(f'老师:{student_name}来了没')
        # 想 queue 中 推送消息
        self.queue.put(student_name)


if __name__ == '__main__':
    queue = Queue()
    teacher = Teacher(queue=queue)
    s1 = Student(name='小明', queue=queue)
    s2 = Student(name='小红', queue=queue)

    s1.start()
    s2.start()

    print('开始点名~')
    teacher.call('小明')
    time.sleep(1)
    teacher.call('小红')

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354