threading模块提供了管理多个线程执行的API。
最简单的用法。就是用一个目标函数实例化一个Thread对象。start()开始工作,join()等待当前线程完成。
import threading
def work():
print("working")
for i in range(5):
t = threading.Thread(target=work)
t.start()
print("please wait!")
t.join()
结果
working
please wait!
working
please wait!
working
please wait!
working
please wait!
working
please wait!
当然也可以传递参数。
import threading
def work(i):
print("%i is working" %i)
for i in range(5):
t = threading.Thread(target=work,args=(i,))
t.start()
print("please wait!")
t.join()
结果
0 is working
please wait!
1 is working
please wait!
2 is working
please wait!
3 is working
please wait!
4 is working
please wait!
确定当前线程
每个Thread实例都有一个带有默认值的名字,也可以传入name参数更改。
import threading
import time
def work():
print(threading.current_thread().getName(),"starting!")
time.sleep(0.5)
print(threading.current_thread().getName(),"end!")
t = threading.Thread(name="worker", target=work)
t.start()
print("please wait!")
t.join()
结果
worker starting!
please wait!
worker end!
出于线程安全的考虑,我们下面用logging模块输出消息。
守护线程与非守护线程
一般来说,程序都会等待所有线程完成工作之后才退出。而守护线程可以一直运行而不阻塞主程序的退出。传入daemon=True或者调用setDaemon()方法并提供参数True来构造守护线程。
import threading
import time
import logging
def daemon():
logging.debug("start")
time.sleep(0.5)
logging.debug("exite")
def non_deamon():
logging.debug("start")
logging.debug("exit")
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='deamon', target=daemon, daemon=True)
t = threading.Thread(name="non-deamon", target=non_deamon)
d.start()
t.start()
结果中没有看到守护线程的退出的消息,主程序就已经退出了。
(deamon ) start
(non-deamon) start
(non-deamon) exit
要等待一个守护线程结束,需要使用join()方法。默认下,join会无限阻塞,但可以传入浮点值。在时间内线程即使未完成,join也会强制返回。
import threading
import time
import logging
def daemon():
logging.debug("start")
time.sleep(0.5)
logging.debug("exite")
def non_deamon():
logging.debug("start")
logging.debug("exit")
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='deamon', target=daemon, daemon=True)
t = threading.Thread(name="non-deamon", target=non_deamon)
d.start()
d.join(0.2)
t.start()
结果还是没有看到守护线程的结束退出的消息。
(deamon ) start
(non-deamon) start
(non-deamon) exit
枚举所有线程
threading.enumerate()会返回一个Thread实例的一个列表。由于等待当前主程序终止会引入一种死锁的情况,所以跳过这个线程等待。注意根据电脑配置,合理调节线程睡眠时间,否则会由于缺少logging参数而报错。
import threading
import time
import logging
import random
def worker():
pause = random.randint(1, 5) / 2
logging.debug("sleeping %0.2f" % pause)
time.sleep(pause)
logging.debug("end!")
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
t = threading.Thread(target=worker, daemon=True)
t.start()
# 拿到当前主线程
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
logging.debug("joining %s" % t.getName())
t.join()
上述程序各个线程执行的顺序image
输出顺序可能不一样,
(Thread-1 ) sleeping 0.50
(Thread-2 ) sleeping 1.50
(Thread-3 ) sleeping 2.00
(MainThread) joining Thread-1
(Thread-1 ) end!
(MainThread) joining Thread-2
(Thread-2 ) end!
(MainThread) joining Thread-3
(Thread-3 ) end!
派生线程
简单的示例
import threading
import logging
class Mythread(threading.Thread):
def run(self):
logging.debug("running")
logging.basicConfig(
level=logging.DEBUG,
format="(%(threadName)s) %(message)s"
)
for i in range(5):
t = Mythread()
t.start()
结果
(Thread-1) running
(Thread-2) running
(Thread-3) running
(Thread-4) running
(Thread-5) running
如果要子类像父类(threading.Thread)那样传递参数,需要重新定义构造函数
import threading
import logging
class Mythread(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
logging.debug("running with %s and %s" % (self.args, self.kwargs))
logging.basicConfig(
level=logging.DEBUG,
format="(%(threadName)s) %(message)s",
)
for i in range(5):
t = Mythread(args=(i,), kwargs={"a": "A", "b": "B"})
t.start()
结果:
(Thread-1) running with (0,) and {'a': 'A', 'b': 'B'}
(Thread-2) running with (1,) and {'a': 'A', 'b': 'B'}
(Thread-3) running with (2,) and {'a': 'A', 'b': 'B'}
(Thread-4) running with (3,) and {'a': 'A', 'b': 'B'}
(Thread-5) running with (4,) and {'a': 'A', 'b': 'B'}
定时器线程
Timer在一个延迟后开始工作,而且可以被任意时刻被取消。
t = threading.Timer(float, target)
# 设定线程名
t.setName(“t1”)
# 取消运行
t.cancel()
线程之间同步操作
如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,时,事件(Event)对象是实现线程间安全通信的一种简单的方法。
event.wait():如果event.is_set()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
event.is_set():当内部标志为True返回True。与isSet()方法一样。
下面是一个简单的两个线程的例子:
import logging
import threading
import time
def wait_for_event(e):
"""
waiting for the event to be set before do anything
:param e: Event对象的形参
:return:
"""
logging.debug("waiting")
# 这个线程被阻塞了
event_is_set = e.wait()
# 如果事件没有set就永远不会执行
logging.debug("event set:%s", event_is_set)
def wait_for_event_timeout(e, t):
"""
wait t seconds and then timeout
:param e: Event对象的形参
:param t: 等待事件的时间(seconds)
:return:
"""
while not e.is_set():
logging.debug("wait_for_event_timeout")
# 表示等待事件的时间为t
event_is_set = e.wait(t)
# 当事件set后或者时间超过t才会继续执行
logging.debug("event set:%s", event_is_set)
if event_is_set:
logging.debug("processing event")
else:
logging.debug("doing other work")
logging.basicConfig(
level=logging.DEBUG,
format="(%(threadName)-10s) %(message)s",
)
# 实例化的Event对象
e = threading.Event()
t1 = threading.Thread(
name="block",
target=wait_for_event,
args=(e,),
)
t1.start()
t2 = threading.Thread(
name="noblock",
target=wait_for_event_timeout,
args=(e, 2)
)
t2.start()
logging.debug("waiting before calling Event.set()")
time.sleep(0.3)
# 启动事件
e.set()
logging.debug("Event is set")
结果:
(block ) waiting
(noblock ) wait_for_event_timeout
(MainThread) waiting before calling Event.set()
(MainThread) Event is set
(block ) event set:True
(noblock ) event set:True
(noblock ) processing event
控制资源访问
threading.LOCK() LocK锁对象
threading.LOCK().acquire() 获取底层锁
threading.LOCK().release() 释放底层锁
注意python中GIL与LOCK的区别,这里就不废话了。
下面是一个两个线程使用lock锁的例子:
import logging
import random
import threading
import time
class Counter:
def __init__(self, start=0):
# 这是Counter的专用锁对象
self.lock = threading.Lock()
self.value = start
def increment(self):
logging.debug("waiting for lock")
# 拿到lock锁
self.lock.acquire()
try:
# 无论谁拿到这个锁,每显示一次value就会加一
logging.debug("acquired lock")
self.value = self.value + 1
finally:
# 释放lock锁
self.lock.release()
def worker(c):
"""
make Counter().value + 2
:param c: Counter实例化对象
:return:
"""
for i in range(2):
pause = random.random()
logging.debug("sleeping % 0.02f", pause)
time.sleep(pause)
c.increment()
logging.debug("done")
logging.basicConfig(
level=logging.DEBUG,
format="(%(threadName)-10s %(message)s)"
)
counter = Counter()
for i in range(2):
# 生成两个线程
t = threading.Thread(target=worker, args=(counter, ))
t.start()
logging.debug("waiting for worker threads")
# 拿到当前python程序的主线程
main_thread = threading.main_thread()
让所有未执行完的其他线程等待。
for t in threading.enumerate():
if t is not main_thread:
t.join()
# 最后计算counter的value值
logging.debug(("Counter:%d" % counter.value))
结果
(Thread-1 sleeping 0.43)
(Thread-2 sleeping 0.84)
(MainThread waiting for worker threads)
(Thread-1 waiting for lock)
(Thread-1 acquired lock)
(Thread-1 sleeping 0.96)
(Thread-2 waiting for lock)
(Thread-2 acquired lock)
(Thread-2 sleeping 0.24)
(Thread-2 waiting for lock)
(Thread-2 acquired lock)
(Thread-2 done)
(Thread-1 waiting for lock)
(Thread-1 acquired lock)
(Thread-1 done)
(MainThread Counter:4)
在写代码中,应该避免死锁或者竞争条件。所有种类的锁还可以如下地写,with语句自动控制锁的获取与释放。
with lock_A:
# 业务代码
statement
再入锁
正常的Lock对象是不能请求多次的。同一个调用链中不同函数访问同一个锁的话,会产生副作用。
threading.RLock() 可以让同一个线程的不同代码“重新获得”锁
同步线程
threading.Condition(),Condition使用了一个Lock,所以可以绑定一个共享资源,使多个线程等待这个资源的更新再启动。
当然Condition也可以显示地使用acquire()和release()方法。
一个简单的示例
import logging
import threading
import time
def consumer(cond):
"""
等待condition设置然后再使用资源
:param cond:
:return:
"""
logging.debug("开启consumer线程")
with cond:
cond.wait()
logging.debug("对consumer线程资源可用")
def producer(cond):
"""
配置资源
:param cond:
:return:
"""
logging.debug("开始producer线程")
with cond:
logging.debug("使资源可用")
# 唤醒所有等待的线程,老的写法叫notifyAll()
cond.notify_all()
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(threadName)-2s %(message)s"
)
condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer,
args=(condition,))
c2 = threading.Thread(name="c2", target=consumer,
args=(condition,))
p = threading.Thread(name="p", target=producer,
args=(condition, ))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()
结果:
2019-01-26 11:56:06,025 c1 开启consumer线程
2019-01-26 11:56:06,226 c2 开启consumer线程
2019-01-26 11:56:06,426 p 开始producer线程
2019-01-26 11:56:06,426 p 使资源可用
2019-01-26 11:56:06,426 c2 对consumer线程资源可用
2019-01-26 11:56:06,427 c1 对consumer线程资源可用
屏障barrier是另一种线程同步机制。Barrier建立一个控制点,阻塞所有的参与的线程,直到所有的线程都到达这一点,然后同时释放阻塞的线程。
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
"waiting for barrier with {} others.".format(barrier.n_waiting))
# 所有等待的线程都在等待时,所有的线程都被同时释放了。
worker_id = barrier.wait()
print(threading.current_thread().name, 'after barrier', worker_id)
NUM_THREAD = 3
barrier = threading.Barrier(NUM_THREAD)
# 推倒式
threads = [
threading.Thread(
name="worker - %s" % i,
target=worker,
args=(barrier, )
)
for i in range(NUM_THREAD)
]
for t in threads:
print(t.name, "starting")
t.start()
time.sleep(0.1)
for t in threads:
t.join()
结果:
worker - 0 starting
worker - 0 waiting for barrier with 0 others.
worker - 1 starting
worker - 1 waiting for barrier with 1 others.
worker - 2 starting
worker - 2 waiting for barrier with 2 others.
worker - 2 after barrier 2
worker - 1 after barrier 1
worker - 0 after barrier 0
abort()方法会使所有等待线程接收一个BrokenBarrierError。直到reset方法恢复,重新开始拦截。
限制资源的并发访问
如果多个线程同时访问一个资源,但要限制总数。这个可以使用Semaphore来管理。
使用方法:
s = threading.Semaphore(2)
t = threading.Thread(
target=worker,
name="t1",
args=(s, )
)
线程特定的数据
对于一些需要保护的资源,需要对这些并非资源所有者的线程隐藏。 threading.local()函数会创建一个对象,它能隐藏值,除非在某个线程中设置了这个属性,这个线程才能看到它。
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug("No value yet")
else:
logging.debug("value=%s" % val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
logging.basicConfig(
level=logging.DEBUG,
format="(%(threadName)-10s %(message)s)",
)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
# 这个worker是看不到local_data的
for i in range(2):
t = threading.Thread(target=worker, args=(local_data, ))
t.start()
t.join()
# 使用子类,来初始化所有的线程开始时都有相同的值
class MyLocal(threading.local):
def __init__(self, value):
super().__init__()
logging.debug("Initializing %s" % self)
self.value = value
local_data = MyLocal(1000)
# 同样的worker调用__init__(),每调用一次以设置默认值
for i in range(2):
t = threading.Thread(target=worker, args=(local_data, ))
t.start()
t.join()
结果:
(MainThread No value yet)
(MainThread value=1000)
(Thread-1 No value yet)
(Thread-1 value=76)
(Thread-2 No value yet)
(Thread-2 value=88)
(MainThread Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
(Thread-3 Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
(Thread-3 value=1000)
(Thread-3 value=31)
(Thread-4 Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
(Thread-4 value=1000)
(Thread-4 value=7)
转自 https://www.cnblogs.com/haoqirui/p/10321194.html