一. 线程间的通信问题
多个线程共用进程空间,所以进程的全局变量对进程内的线程均可见。使用全局变量通信是线程主要通信方法。线程间通信更容易产生资源争夺,往往需要同步互斥机制保证通信安全。和multiprocessing中的Event和Lock类似,threading模块也提供了Event和Lock类来实现线程的同步互斥操作,关于二者的使用总结如下:
import threading
event = threading.Event
创建线程的Event事件。
event.wait([timeout])
如果event为设置状态则不阻塞,未设置则阻塞。
event.set()
将event变为设置状态
event.clear()
将event设置去除。
import threading
lock = threading.Lock()
创建锁。
acquire(blocking=True, timeout=None)
上锁。
参数:blocking=True,默认计数器值等于0时阻塞,传入非阻塞标志Flase时,将不再阻塞,并返回False;
操作原理 : 重复上锁会阻塞
lock.release()
解锁。
示例代码:利用锁实现同步互斥
from atexit import register
from random import randrange
from threading import Thread,Lock,currentThread
from time import sleep,ctime
class CleanOutputSet(set):
def __repr__(self):
return ', '.join(x for x in self)
lock = Lock()
loops = (randrange(2,5) for x in range(randrange(3,7)))
remaining = CleanOutputSet()
def mytime():
return ctime().split(' ')[3]
def loop(nsec):
lock.acquire()
remaining.add(currentThread().name)
print("{} Started {}".format(mytime(), currentThread().name))
lock.release()
sleep(nsec)
lock.acquire()
remaining.remove(currentThread().name)
print("{} Completed {}({} secs)".format(mytime(), currentThread().name, nsec))
print("Remaining thread:{}".format(remaining or 'NONE'))
lock.release()
def main():
for pause in loops:
Thread(target=loop, args=(pause,)).start()
@register
def exit():
print(mytime(), "All done!")
if __name__ == "__main__":
main()
运行结果:CleanOutputSet继承自set,重写repr方法以改变print的输出样式;字符串的join方法接收的参数类型为一个可迭代对象,生成器表达式(x for x in self)返回一个生成器对象,属于可迭代对象;randrange(start, end)产生一个start~end范围内的随机整数,(randrange(2,5) for x in range(randrange(3,7))返回一个包含36个随机数的生成器对象,且随机数的范围为24;使用atexit.register()来注册exit()函数,解释器会在脚本退出前执行该函数。所有线程在操作共享资源remaining时上锁,在睡眠时解锁。
二. 线程的应用
由于线程共享进程的资源,故线程间通信可以借助全局变量实现。当然这个全局变量也可以是具有特殊数据结构的信号量和队列。下面的2个示例分别用来说明线程和信号量的应用,以及线程和队列的应用。
2.1 线程和信号量
信号量实际就是一个计数器:分配一个单位的资源时,计数器值-1;一个单位的资源返回资源池时,计数器值+1。因此可以使用信号量来跟踪有限的资源,即用信号量表示资源可用/不可用。threading模块包含2种信号量的类:Semaphore和BoundedSemaphore,其中BoundedSemaphore提供了一个额外的功能:计数器值永远不会超过它的初始值,当计数器值满时,再release会触发ValueError异常。本例模拟一个简单的糖果机:糖果机共有5个空槽,使用信号量来跟踪有限的资源,主线程创建两个线程,模拟生产者和消费者:
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread
from time import sleep,ctime
lock = Lock()
MAX = 3
candytray = BoundedSemaphore(MAX)
def refill():
with lock:
print("Refilling candy...")
try:
candytray.release()
except ValueError:
print("Full! Skipping...")
else:
print("OK!")
def buy():
with lock:
print("Buying candy...")
if candytray.acquire(False):
print('OK!')
else:
print('Empty! Skipping...')
def producer(loops):
for i in range(loops):
refill()
sleep(randrange(3))
def consumer(loops):
for i in range(loops):
buy()
sleep(randrange(3))
@register
def atexit():
print(ctime(), "ByeBye~")
nloops = randrange(2, 4)
Thread(target=consumer, args=(randrange(nloops, nloops+MAX+2),)).start() # 模拟供不应求的情况
Thread(target=producer, args=(nloops,)).start()
运行结果:初始的BoundedSemaphore信号量为3,当信号量为3时,再执行release时,将触发valueError异常;当所有的信号量均被acquire后,再执行acquire(blocking=False)时,将立刻返回False:
2.2 线程和消息队列
模拟生产者和消费者问题:生产者向队列中投放商品,消费者从队列中获取商品:
import threading
from random import randint
from time import sleep, ctime
from queue import Queue
class MyThread(threading.Thread):
def __init__(self, func, args, name=''):
super().__init__()
self.name = name
self.func = func
self.args = args
def get_result(self):
return self.res
def run(self):
self.res = self.func(*self.args)
def writeQ(queue):
queue.put("*", 1)
print(ctime(), "+1 queue=", queue.qsize())
def readQ(queue):
val = queue.get(1)
print(ctime(), "-1 queue=", queue.qsize())
def writer(queue, loops):
for i in range(loops):
writeQ(queue)
sleep_time = randint(1, 2)
print(ctime(), 'producer will sleep', sleep_time, 's')
sleep(sleep_time)
def reader(queue, loops):
for i in range(loops):
readQ(queue)
sleep_time = randint(4, 5)
print(ctime(), 'consumer will sleep', sleep_time, 's')
sleep(sleep_time)
funcs = [writer, reader]
q = Queue(32)
threads = []
producer = MyThread(funcs[0], (q, 3), funcs[0].__name__)
consumer = MyThread(funcs[1], (q, 3), funcs[1].__name__)
threads.append(producer)
threads.append(consumer)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("ALL DONE")
运行结果:生产者线程向队列中依次加入3个 *
,消费者线程从队列中消费 *
三. 全局解释器锁
Python代码的执行,是由python虚拟机(也称为解释器主循环)控制的。虽然Python解释器中可以运行多个线程,但同一时刻只能有一个线程被解释执行。python C解释器为了保证线程安全,默认加了一把全局的锁,但对于IO操作,GIL会在IO调用前被释放,以允许其它线程在IO执行时运行,即线程遇到阻塞会让出解释器。Python线程适用于高延迟的IO操作,如网络通信;不适合cpu密集型或者传输速度很快的IO操作。
为此,作为解决方案,可以用java解释器代替C解释器,或者使用多进程程序处理CPU密集型问题。
四. 多线程和多进程
4.1 进程、线程的区别及联系
两者都是多任务编程的方式,都能够使用计算机多核资源;进程创建和删除要比线程消耗更多计算机资源;进程空间独立数据安全性好,有专门的通信方法;线程使用全局变量通信,更加简单,但是往往需要同步互斥操作;一个进程可以包含多个线程,线程共享进程资源。
4.2 应用场景
- 如果需要创建较多的并发,而任务比较简单,线程比较适合;
- 使用线程时需要考虑到同步互斥复杂程度;
- python 线程要考虑到GIL问题,CPU密集型操作使用进程,而IO密集型操作使用线程;