Python 多线程笔记
创建线程
1. 使用函数创建多线程
from threading import Thread
# 创建一个需要多线程执行的函数
def hell(name='Python'):
for i in range(2):
print('Hello', name)
time.sleep(1)
# 创建线程
thread_01 = Thread(target=hell)
# 启动线程
thread_01.start()
2. 使用类创建多线程
- 继承
Thread
类 - 重写
run()
方法
import time
from threading import Thread
# 继承 Thread
class MyThread(Thread):
def __init__(self, id=1):
super().__init__() # 固定写法
self.name = f'thread-{id}'
# 重写 run 方法
def run(self):
for i in range(2):
print('hello, ', self.name)
time.sleep(1)
# 获取线程对象
thread_1 = MyThread(1)
# 启动线程
thread_1.start()
Lock(锁) 机制
锁的常用方法
import threading
# 生成锁对象,全局唯一
lock = threading.Lock()
# 获取锁,获取到锁才能执行(只有一人能获取到锁)
lock.acquire()
# 释放锁,释放状态下的锁才能被获取
lock.release()
提示:获取锁后,一定要记得释放锁,不然会造成死锁的问题
with
自动获取释放锁(推介用法)
lock = threading.Lock()
# 执行 with 代码块中的代码是会自动获取锁,执行后会自动释放锁
with lock:
# 业务逻辑
pass
RLock(可重入锁)
示例
import threading
lock = threading.Lock()
# 获取锁
lock.acquire()
# out: True
# 再次获取锁
lock.acquire() # 产生死锁(因为锁已经被获取,所以会阻塞,直至锁被释放,又因为获取锁的就是此线程,该线程又被阻塞了,锁不可能被释放,产生死锁)
如果是 RLock 就不会被阻塞
import threading
rlock = threading.RLock()
rlock.acquire()
# out: True
rlock.acquire() # 并不会阻塞
# out: True
Rlock
就是为了解决了上面的问题而生的, RLock
是可重入锁(reentrant lock,acquire()
能够不被阻塞的被同一个线程调用多次。
RLock 部分源码
# 底层的获取锁方法
_allocate_lock = _thread.allocate_lock
get_ident = _thread.get_ident
class _RLock:
def __init__(self):
# 获取锁对象
self._block = _allocate_lock()
self._owner = None
self._count = 0
def acquire(self, blocking=True, timeout=-1):
# 获取当前线程表示当前线程的数字
me = get_ident()
# 如果锁的拥有者是当前线程(同一线程重复获取锁)
if self._owner == me:
# 获取锁计数 +1
self._count += 1
return 1
# 第一次获取锁
# 通过锁对象获取锁
rc = self._block.acquire(blocking, timeout)
if rc:
# 锁拥有者标记为当前线程
self._owner = me
self._count = 1
return rc
def release(self):
# 锁已被获得,但不在本线程上
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
# 获取锁计数 -1
self._count = count = self._count - 1
# 如果锁计数为 0,把锁拥有者置为 None,并释放锁
if not count:
self._owner = None
self._block.release()
第一次使用 acquire()
方法时会真正的获取一把锁(锁没有被别的线程获取时),并把获取锁的计数+ 1
,如果再获取锁,Rlock
就只增加获取的计数,
每次调用 release()
方法都会将锁计数 -1
,如果锁计数为0
时才会真正的释放锁。
_thread.allocate_lock 方法
_thread.allocate_lock(blocking=True,timeout=-1)
- 以阻塞或非阻塞的方式获取一个锁
- 当 blocking 参数为 True,调用者的线程会阻塞直到锁转为“unlocked”状态
- 当 blocking 参数为 False 时,则不会阻塞。如果没有获取到锁则会立即返回一个 False,并继续执行线程
- 当 timeout 参数为一个正的浮点数时,若发生阻塞,则至多阻塞 timeout 秒;若为 –1 (默认值),则表示会一直阻塞下去。
Condition
常用方法
import threading
# 获取 Condition 对象
cond = threading.Condition()
# 获取锁对象,跟 Lock.acquire() 类似
cond.acquire()
# 释放锁对象,跟 Lock.release() 类似
cond.release()
# 阻塞,直到 cond.notify() 方法被调用
cond.wait()
# 唤醒阻塞(前提需要获取锁)
cond.notify()
**提示:使用 cond.wait() 或者 cond.notify()
之前需要调用 cond.acquire()
方法,执行完成后记得释放锁,也就是执行 cond.release()
方法 **
使用示例
import threading
import time
class Hide(threading.Thread):
def __init__(self, cond, name):
super().__init__()
self.cond = cond
self.name = name
def run(self):
time.sleep(1)
self.cond.acquire() # 获取锁
print(f'{self.name}: 我已经把眼睛蒙上了')
self.cond.notify()
self.cond.wait()
print(f'{self.name}: 我找到你了 ~_~')
self.cond.notify()
self.cond.wait()
print(f'{self.name}: 我赢了')
self.cond.release
class Seeker(threading.Thread):
def __init__(self, cond, name):
super().__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait()
print(f'{self.name}: 我已经藏好了,你快来找我吧')
self.cond.notify()
self.cond.wait()
print(f'{self.name}: 被你找到了,哎~~~')
self.cond.notify()
self.cond.release()
if __name__ == '__main__':
cond = threading.Condition()
cond.release()
hide = Hide(cond, '小明')
seeker = Seeker(cond, '小红')
hide.start()
seeker.start()
hide.join()
seeker.join()
event 事件
threading.Event
是 Python 中的简单线程通信机制,它对Condition
进行了封装,它的功能跟 Condition
类似可以设置多个线程等待某个事件发生,当时间发生后,线程才会被激活执行。
常用的三个方法
import threading
# 获取一个事件对象
event = threading.Event()
# 重置事件
event.clear()
# 阻塞,直至 event.set() 方法执行
event.wait()
# 唤醒阻塞线程
event.set()
Event 源码
# set 方法将内部标志设置为 True
def set(self):
with self._cond:
self._flag = True
self._cond.notify_all()
# clear 方法将内部标志设置为 False
def clear(self):
with self._cond:
self._flag = False
# 如果是 True 之间返回,否者阻塞,直至超时
def wait(self, timeout=None):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
从上面可以看出,wait()
方法会根据内部的 _flag
状态决定是否阻塞,如果 _flag == False
就会阻塞,反之则不会阻塞,而 set()
方法会将 _flag
设置成 True
, clear()
方法会将 _flag
设置成 False
Queue
queue
模块提供适用于多线程编程的先进先出(FIFO)数据结构。因为它是线程安全的,所以多个线程很轻松地使用同一个实例。
with 语句
Queue 源码中有大量的 with self.xxx:
的用法,也就是说想要看懂 Queue 的源码首先得清楚 with
这一语句的作用是什么。
首先要明白几个概念:
上下文管理协议(Context Management Protocol):在 Python 类如果想要支持该协议需要实现
__enter__()
和__exit__()
这两个方法。上下文管理器(Context Manager): 上下文管理器要负责对支持上下文管理协议的对象,进入指定代码块之前执行指定操作(
__enter__()
方法),离开指定代码块后执行指定的操作(__exit__()方法
)
whit
是 Python 中使用最广泛的上下文管理器,会非常适合需要执行成对的操作,如:打开/关闭文件,获取/释放锁,
class Queue:
def __init__(self, maxsize=0):
# 省略
self.mutex = threading.Lock()
# 获取 Condition 对象
self.not_empty = threading.Condition(self.mutex)
# 省略
def get(self, block=True, timeout=None):
# 执行前会调用 condition 的 __enter__ 方法
# 执行完毕会调用 condition 的 __exit__ 方法
with self.not_empty:
# 省略
Condition
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
def __enter__(self):
# 调用的是 Lock 对象的 __enter__() 方法
return self._lock.__enter__()
def __exit__(self, *args):
# 调用的是 Lock 对象的 __exit__() 方法
return self._lock.__exit__(*args)
Lock
class Lock:
# 获取锁
__enter__ = acquire
# 释放锁
def __exit__(self, t, v, tb):
self.release()
Queue 源码解析
'''A multi-producer, multi-consumer queue.'''
import threading
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time
# 模块导入限制,当前文件被导入时只能导入 __all__ 内指定的属性、方法、类
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
class Empty(Exception):
pass
class Full(Exception):
pass
class Queue:
def __init__(self, maxsize=0):
# 设置队列最大容量,0 表示无限容量
self.maxsize = maxsize
self._init(maxsize)
# 获取锁对象(互斥锁)
self.mutex = threading.Lock()
# 使用锁对象,衍生的三个条件变量
# 所有 with self.这三个变量: 的代码块都是互斥的,只可能有一个执行中
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
# 表示处理完了元素,get 仅仅是表示获取到了元素
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
# 阻塞,直到所有元素处理完成
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
def qsize(self):
""" 获取队列中的元素数量 """
with self.mutex:
return self._qsize()
def empty(self):
""" 检查队列是否为空 """
with self.mutex:
return not self._qsize()
def full(self):
""" 检查队列是否已满 """
with self.mutex:
return 0 < self.maxsize <= self._qsize()
# block 表示是否阻塞
# timeout 表示超时时间,没有指定则无限等待
def put(self, item, block=True, timeout=None):
""" 添加元素到队列 """
with self.not_full:
# 检查是否设置了队列容量
if self.maxsize > 0:
if not block:
# 检查容量是否已满
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
# 已满,阻塞,等待队列任务被消费
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
# 如果队列已满,等待指定的时间,如果还是满的,抛出异常
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
# 阻塞指定时间
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify() # 唤醒 get() 方法的阻塞
# block 表示是否阻塞
# timeout 表示超时时间,没有指定则无限等待
def get(self, block=True, timeout=None):
""" 消费队列元素 """
with self.not_empty:
# 如果不等待,队列是空的直接抛异常
if not block:
if not self._qsize():
raise Empty
# 没有限制时间,等待元素加入队列
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
# 指定了等待时间,且队列是空的,等待指定之间后还是空的,抛出异常
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
# 阻塞指定时间
self.not_empty.wait(remaining)
item = self._get()
# 注意:这里并有 unfinished_tasks - 1,get 仅仅表示获取到了元素,并不代表处理完
self.not_full.notify() # 唤醒 put() 方法的阻塞
return item
def put_nowait(self, item):
'''Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
'''
return self.put(item, block=False)
def get_nowait(self):
'''Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)
# 下面的方法是为了方便重写
# 初始化队列
def _init(self, maxsize):
self.queue = deque()
# 获取队列元素数量
def _qsize(self):
return len(self.queue)
# 追加元素
def _put(self, item):
self.queue.append(item)
# 获取第一个元素
def _get(self):
return self.queue.popleft()
使用示例
from queue import Queue
from threading import Thread
import time
class Student(Thread):
def __init__(self, name, queue):
super().__init__()
self.name = name
self.queue = queue
def run(self):
while True:
# 阻塞,等待消息
msg = self.queue.get()
if msg == self.name:
print(f'{self.name}: 到!')
return
class Teacher:
def __init__(self, queue):
self.queue = queue
def call(self, student_name):
print(f'老师:{student_name}来了没')
# 想 queue 中 推送消息
self.queue.put(student_name)
if __name__ == '__main__':
queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name='小明', queue=queue)
s2 = Student(name='小红', queue=queue)
s1.start()
s2.start()
print('开始点名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小红')