- 线程状态
新建,就绪,运行,阻塞,死亡。 - 线程同步
多线程可以同时运行多个任务,线程需要共享数据的时候,可能出现数据不同步的问题,用最习惯的LabVIEW来说,多线程中的2个循环如果一个对数组进行写入,另一个进行读取,就有可能导致还没写入就要读取,为了避免这种情况,就要给线程加锁。
锁有两种状态-锁定和未锁定。每当一个线程要访问共享数据的时候,必须先获得锁定,如果有别的线程已经获得锁定了,那么该线程就得暂停,这个称之为同步阻塞。等到另外的线程访问完毕之后,释放锁,该线程才能继续访问。 - 线程通信
另外一种可能就是数组还没有创建,写入线程和读取线程肯定不能运行,所以必须等待创建线程的通知。所以引入了条件变量。
条件变量允许线程在条件不满足的时候等待,条件满足的时候开始运行。 - 线程运行和阻塞的状态转换
同步阻塞指处于竞争锁定的状态,线程请求锁定时进入这个状态,一旦成功获得锁定时又恢复到运行状态。
等待阻塞是指等待其他线程通知的状态,线程获得条件锁定后,调用等待将进入这个状态,一旦其他线程发出通知,线程将进入同步阻塞状态,再次竞争条件锁定。
其他阻塞是指调用time.sleep(),anotherthread.join()或者等待IO时的阻塞,这个状态下线程不会释放已获得的锁定。 - thread模块
import thread
import time
#在线程中执行的函数
def func():
for i in range(5):
print 'func'
time.sleep(1)
#结束当前线程
thread.exit()
#与thread.exit_thread()等价
#func返回的时候,线程同样会结束
#启动线程,线程立即开始运行
thread.start_new(func,())
#这个方法和thread.start_new_thread()等价
#第一个参数是方法,第二个参数是方法的参数,如果没有参数,传入空的元组
#创建锁
lock = thread.allocate()
#这个方法和thread.allocate_lock()等价
#判断锁的状态,锁定还是释放
print lock.locked()
#锁通常用于控制对共享资源的访问
count = 0
#获得锁,成功获得锁定后返回True
#可选的timeout参数不填将一直阻塞到获得锁定
#否则超时后返回False
if lock.acquire():
print "True"
count += 1
#释放锁
lock.release()
#thread模块提供的线程都在主线程结束后同时结束
time.sleep(6)```
`thread.interrupt_main()`:在其他线程中终止主线程。
6. threading模块
锁Lock和条件变量Condition在Python中是独立对象。
`threading.currentThread()`:返回当前的线程变量
`threading.enumerate()`:返回一个正在运行的线程的list,指线程启动后,结束前。
`threading.activeCount()`:返回正在运行的线程数量,`len(threading.enumerate())`有相同的结果。
threading提供的类,Thread,Lock,Rlock,Condition,[Bounded]Semaphore,Event,Timer,local
1. Thread
import threading
开启线程,将要执行的函数作为参数传给Thread的构造方法
def func():
print "func() passed to Thread"
t = threading.Thread(target=func)
t.start()
从Thread继承,重写run(),在线程开始运行的时候就会运行run
class MyThread(threading.Thread):
def run(self):
print "MyThread extended from Thread"
t = MyThread()
t.start()```
isAlive()
:返回线程是否在运行。
get/setName(name)
:获取/设置线程名
is/setDaemon(bool):
获取/设置是否守护线程
start()
启动线程
join([timeout])
:阻塞当前上下文的线程,直到调用此方法的线程终止或者到达指定的timeout
#join()的一个用法
import threading
import time
def context(tJoin):
print "in threadContext"
#启动参数传递过来的线程
tJoin.start()
#阻塞tContext直到thread tJoin线程终止
tJoin.join()
#tJoin结束后继续运行
print "out threadContext"
def joinfunc():
print "in threadJoin"
time.sleep(4)
print 'out threadJoin'
#创建线程tJoin,在tContext线程中运行,然后阻塞tContext
#tJoin运行完之后,继续tContext线程
tjoin = threading.Thread(target=joinfunc)
tContext = threading.Thread(target=context,args=(tjoin,))
tContext.start()```
2. Lock
Lock指令锁是可用的最低级的同步指令,Lock处于锁定状态时候,不被特定的线程拥有,Lock包含两种状态锁定和非锁定,以及两个基本方法。
可以认为Lock又一个锁定池,当线程请求锁定时,将线程置于池中,直到获得锁定后出池。池中的线程处于同步阻塞的状态。
acquire([timeout])使得线程进入同步阻塞状态,尝试获得锁定,也就是用在哪个线程,哪个线程就处于阻塞,直到获得锁定,获得锁定后需要释放锁
release()释放锁,使用前当前线程必须已经获得锁定
import threading
import time
data = 0
lock = threading.Lock()
def func():
global data
print "%s acquire lock..."% threading.currentThread().getName()
#调用acquire()时候,线程一直阻塞,直到获得锁定
#返回是否获得锁
if lock.acquire():
print "%s get the lock"% threading.currentThread().getName()
data += 1
time.sleep(2)
print "%s release lock..."%threading.currentThread().getName()
lock.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()```
终于弄懂了Lock()这块,acquire(),release()的用法
- RLock
RLock可重入锁是一个可以被同一个线程请求多次的同步指令,处于锁定状态时,RLock被某个线程拥有,拥有RLock的线程可以在此调用acquire(),释放锁的时候需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用acquire()/release(),计数器加一或者减一,为0的时候锁处于未锁定状态。
import threading
import time
rlock = threading.RLock()
def func():
#第一次请求锁定
print '%s acquire lock..'%threading.currentThread().getName()
if rlock.acquire():
print "%s get the lock"%threading.currentThread().getName()
time.sleep(2)
#第二次请求锁定
print "%s acquire lock again.."%threading.currentThread().getName()
if rlock.acquire():
print "%s get the lock"%threading.currentThread().getName()
time.sleep(2)
#第一次释放锁
print "%s release lock "%threading.currentThread().getName()
rlock.release()
time.sleep(2)
#第二次释放锁
print "%s release lock "%threading.currentThread().getName()
rlock.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()```
4. Condition
Condition通常和一个锁关联,需要在多个Condition中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例
除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知,得到通知后线程进入锁定池等待锁定。
acquire()/release():调用关联的锁的相应方法
wait():调用这个方法将使线程进入Condition的等待池等待通知,并且释放锁。使用前线程必须已经获得锁定。
notify()调用这个方法将从等待池中挑选一个线程并且通知,收到通知的线程将自动调用acquire()尝试获得锁定。其他线程仍然在等待池中,调用这个方法不回释放锁定,使用前线程已获得锁定。
notifyAll()调用这个方法将通知等待池中所有线程,这些线程都将进入锁定池尝试获得锁定,调用这个方法不会释放锁定,使用前必须已获得锁定。
这两个方法都是用于已经获得锁定的线程。
import threading
import time
product = None
con = threading.Condition()
生产者方法
def produce():
global product
#该进程获得锁
if con.acquire():
while True:
if product is None:
print "produce..."
product = "anything"
#通知消费者,商品已经生产
con.notify()
#使得这个线程进入等待阻塞,并且释放锁
con.wait()
time.sleep(2)
消费者
def consume():
global product
#尝试该线程加锁
if con.acquire():
while True:
if product is not None:
print "consume..."
product = None
#通知
con.notify()
con.wait()
time.sleep(2)
t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()```
很有意思的小程序啊
- Semaphore/BoundedSemaphore
信号量,管理一个内置的计数器,每当调用acquire()时候-1,调用release()时候+1,计数器不能小于0.当计数器为0时候,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()
Semaphore经常来同步一些油访客上限的对象,如连接池。
BoundedSemaphore与Semaphore的唯一区别是前者将在调用release()时候检查计数器的值是否超过了计数器的初始值。
Semaphore(value=1):value是计数器的初始值。
acquire()请求Semaphore,如果计数器为0,将阻塞线程至同步阻塞状态,否则将计数器-1并立即返回
release()释放Semaphore,将计数器+1,如果使用BoundedSemaphore,还将进行释放次数检查,release()方法不检查线程是否已经获得Semaphore
import threading
import time
semaphore = threading.BoundedSemaphore(2)
def func():
#请求Semaphore,成功后计数器-1,计数器为0的时候阻塞
print "%s acquire semaphore..."%threading.currentThread().getName()
if semaphore.acquire():
print "%s get semaphore"%threading.currentThread().getName()
time.sleep(4)
#释放Semaphore,计数器+1
print '%s release semaphore'%threading.currentThread().getName()
semaphore.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t4 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
t4.start()
time.sleep(2)
#没有获得semaphore的主线程也可以调用release
#如果使用BoundedSemaphore,主线程释放后将抛出异常
print "mainthread release semaphore without acquire"
semaphore.release()```
想起了复习操作系统时候的信号量啊!
6. Event
Event是最简单的线程通信机制,一个线程通知事件,其他线程等待事件,Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为False,wait()将阻塞线程至等待阻塞状态。
Event没有锁,无法使线程进入同步阻塞状态。
isSet()当内置标志为True时候返回True
set()将标志设为True,并且通知所有处于等待阻塞状态的线程恢复运行状态。
clear()将标志设为False
wait()如果标志为True则立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()
import threading
import time
event = threading.Event()
def func():
#等待事件,进入等待阻塞状态
print "%s wait for event"%threading.currentThread().getName()
#如果标志为True立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()
event.wait()
#收到事件后进入运行状态
print "%s recv event"%threading.currentThread().getName()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
time.sleep(2)
发送事件通知
print "mainthread set event"
将标志设为True,通知所有处于等待阻塞状态的线程恢复运行状态
event.set()```
- Timer
Timer定时器是Thread的派生类,用于在指定时间后调用一个方法
Timer(interval,function,args,kwargs)
interval指定的时间,function要执行的方法
import threading
def func():
print "hello timer!"
timer = threading.Timer(5,func)
timer.start()```
8. local
local是一个小写字母开头的类,用于管理线程局部的数据,对于同一个local,线程无法访问其他线程设置的属性,线程设置的属性不会被其他线程设置的同名属性替换。
可以把local看成一个线程属性字典的字典,local封装了从自身使用线程作为key检索对应的属性字典,再使用属性名作为key检索属性值的细节。
import threading
local = threading.local()
local.tname = 'main'
def func():
local.tname = 'not main'
print local.tname
t1 = threading.Thread(target=func)
t1.start()
阻塞主线程,直到t1线程运行完毕
t1.join()
print local.tname```
一个例子
import threading
alist = None
condition = threading.Condition()
def doSet():
global alist
print "%s acquire lock"%threading.currentThread().getName()
if condition.acquire():
print '%s get lock'%threading.currentThread().getName()
while alist is None:
condition.wait()
print "%s wait notify"%threading.currentThread().getName()
print "%s get notify"%threading.currentThread().getName()
for i in range(len(alist))[::-1]:
alist[i] = i
print "%s release lock"%threading.currentThread().getName()
condition.release()
def doPrint():
global alist
if condition.acquire():
while alist is None:
condition.wait()
for i in alist:
print i,
print
condition.release()
def doCreate():
global alist
if condition.acquire():
if alist is None:
alist = [i for i in range(10)]
condition.notifyAll()
condition.release()
tset = threading.Thread(target=doSet,name='tset')
tprint = threading.Thread(target=doPrint,name='tprint')
tcreate = threading.Thread(target=doCreate,name='tcreate')
tset.start()
tprint.start()
tcreate.start()```