进程会启动一个解释器进程,线程共享一个解释器进程
Python的线程开发
python的线程开发使用标准库threading
一、Thread类
签名:
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
参数名 | 含义 |
---|---|
target | 线程调用的对象,就是目标函数 |
name | 为线程起个名字 |
args | 为目标函数传递实参,元祖 |
kwargs | 为目标函数关键字传参,字典 |
二、线程启动
# 线程启动
import threading
# 最简单的线程程序
def worker():
print('I am working')
print('Fineshed')
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
通过threading.Thread创建一个线程对象,target是目标函数,name可以指定名称。
但是线程没有启动,需要调用start方法。
线程之所以执行函数,是因为线程中就是执行代码的,而最简单的的封装就是函数,所以还是函数调用。
函数执行完,线程也就退出了。
三、线程的退出
Python没有提供线程退出的方法,线程在下面情况时退出:
1、线程函数内语句执行完毕
2、线程函数中抛出未处理的异常
# 线程的退出
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(2)
print("I'm working")
count += 1
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
print('==End==')
# 输出结果
==End==
I'm working
I'm working
I'm working
I'm working
I'm working
I'm working
python的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了。
四、线程的传参
# 线程的传参
import threading
import time
def add(x,y):
print("{} + {} = {}".format(x, y, x+y, threading.current_thread().ident))
thread1 = threading.Thread(target=add, name='add', args=(4,5)) # 线程对象
thread1.start() # 启动线程
time.sleep(2)
thread2 = threading.Thread(target=add, name='add', args=(5,), kwargs={'y':4}) # 线程对象
thread2.start() # 启动线程
time.sleep(2)
thread3 = threading.Thread(target=add, name='add', kwargs={'x':4, 'y':5}) #线程对象
thread3.start()
线程传参和函数传参没什么区别,本质上就是函数传参。
五、threading的属性和方法
名称 | 含义 |
---|---|
name | 只是一个名字,一个标识,名称可以重名。getName(), setName()获取、设置这个名词 |
ident | 线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可以访问。此ID可以重复使用 |
is_alive() | 返回线程是否活着 |
注意:线程的name,这是一个名称,可以重复;ID必须唯一,但可以在线程退出后再利用。
# threading的属性和方法
import threading
import time
def showthreadinfo():
print("currentthread = {}".format(threading.current_thread()))
print("main thread = {}".format(threading.main_thread()))
print("active count = {}".format(threading.active_count()))
def worker():
count = 0
showthreadinfo()
while True:
if (count > 5):
break
time.sleep(1)
count += 1
print("I'm working")
t = threading.Thread(target=worker, name='worker') # 线程对象
showthreadinfo()
t.start() # 启动
print('==End==')
# 输出:
currentthread = <_MainThread(MainThread, started 4320764736)>
main thread = <_MainThread(MainThread, started 4320764736)>
active count = 1
currentthread = <Thread(worker, started 123145549832192)>
==End==
main thread = <_MainThread(MainThread, stopped 4320764736)>
active count = 2
I'm working
I'm working
I'm working
I'm working
I'm working
I'm working
名称 | 含义 |
---|---|
start() | 启动线程。每一个线程必须且只能执行该方法一次 |
run() | 运行线程函数 |
start方法
import threading
import time
def worker():
count = 0
while True:
if (count > 5):
break
time.sleep(1)
count += 1
print("worker running")
class MyThread(threading.Thread):
def start(self):
print('start~~~~~~~~~~~')
super().start() # 调父类(就是Thread类)的start()
def run(self):
print('run~~~~~~~~~~~~~')
super().run() # 调父类的run方法
t = MyThread(name='worker', target=worker)
t.start()
#t.run()
# t.start()运行结果
start~~~~~~~~~~~
run~~~~~~~~~~~~~
worker running
worker running
worker running
worker running
worker running
worker running
# t.run()运行结果
run~~~~~~~~~~~~~
worker running
worker running
worker running
worker running
worker running
worker running
start()方法会调用run()方法,而run()方法可以运行函数
start和run的区别
在线程函数中,增加打印线程的名字和语句,看看能看到什么信息?
import threading
import time
def worker():
count = 0
while True:
if (count > 5):
break
time.sleep(1)
count += 1
print("worker running")
print(threading.current_thread().name)
class MyThread(threading.Thread):
def start(self):
print('start~~~~~~~~~~~')
super().start() # 调父类(就是Thread类)的start()
def run(self):
print('run~~~~~~~~~~~~~')
super().run() # 调父类的run方法
t = MyThread(name='worker', target=worker)
t.start()
#t.run() # 分别执行start或者run方法
# t.start()运行结果
start~~~~~~~~~~~
run~~~~~~~~~~~~~
worker running
worker
worker running
worker
worker running
worker
worker running
worker
worker running
worker
worker running
worker
# t.run()运行结果
run~~~~~~~~~~~~~
worker running
MainThread
worker running
MainThread
worker running
MainThread
worker running
MainThread
worker running
MainThread
worker running
MainThread
使用start方法启动线程,是启动了一个新的线程,名字叫做worker运行。但是使用run方法并没有启动新的线程,就是在主线程中调用了一个普通的函数而已。
因此,启动线程请使用start方法,才能启动多个线程。
六、多线程
一个进程中有多个线程,实现一种并发
import threading
import time
def worker():
count = 0
while True:
if (count > 3):
break
time.sleep(1)
count += 1
print("worker running")
print(threading.current_thread().name, threading.current_thread().ident)
class MyThread(threading.Thread):
def start(self):
print('start~~~~~~~~~~~')
super().start() # 调父类(就是Thread类)的start()
def run(self):
print('run~~~~~~~~~~~~~')
super().run() # 调父类的run方法
t1 = MyThread(name='worker1', target=worker)
t2 = MyThread(name='worker2', target=worker)
t1.start()
t2.start()
# 运行结果
start~~~~~~~~~~~
run~~~~~~~~~~~~~
start~~~~~~~~~~~
run~~~~~~~~~~~~~
worker running
worker1 123145457434624
worker running
worker2 123145462689792
worker running
worker running
worker1 123145457434624
worker2 123145462689792
worker running
worker running
worker2 123145462689792
worker1 123145457434624
worker running
worker running
worker1 123145457434624
worker2 123145462689792
可以看到worker1和worker2交替执行
若改成run方法的话:
# t1.run()方法和t2.run()方法运行结果
run~~~~~~~~~~~~~
worker running
MainThread 4320764736
worker running
MainThread 4320764736
worker running
MainThread 4320764736
worker running
MainThread 4320764736
run~~~~~~~~~~~~~
worker running
MainThread 4320764736
worker running
MainThread 4320764736
worker running
MainThread 4320764736
worker running
MainThread 4320764736
可以看到run()方法没有开新的线程,就是普通的函数调用,所以执行完t1.run(),然后执行t2.run(),不是多线程
而使用start方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程
一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。一个进程至少有一个主线程。其他线程称为工作线程。
线程安全
# 七、线程安全
import threading
def worker():
for x in range(10):
print("{} is running".format(threading.current_thread().name))
for x in range(1,5):
name = "worker{}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
在iPython中的运行结果:
可以看到,本应该是一行行打印,但是很多字符串打在了一起。说明print函数被打断了,被线程切换打断了。
print函数分两步,第一步打印字符串,第二步换行,就在这之间,发生了线程的切换。
这说明print函数是线程不安全的
线程安全:线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的
上例中,本以为print应该是打印文本之后紧跟着一个换行的,但是有时候确实好几个文本在一起,后面跟上换行,而且发生这种情况的时机不确定,所以,print函数不是线程安全函数。
那么多线程编程的时候,print输出日志,不能保证一个输出一定后面立即换行了,解决方案:
- 1、不让print打印换行
import threading
def worker():
for x in range(10):
print("{} is running\n".format(threading.current_thread().name, end=''))
for x in range(1,5):
name = "worker{}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
字符串是不可变的类型,它可以作为一个整体不可分割输出。end=''就不再让print输出换行了。
运行结果:
- 2、使用logging
标准库里面的logging模块,日志处理模块,线程安全的,生成环境代码都使用logging
import threading
import logging
def worker():
for x in range(10):
logging.warning("{} is running".format(threading.current_thread().name))
for x in range(1,5):
name = "worker{}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
运行结果:
daemon线程和non-daemon线程
进程靠线程执行代码,至少有一个主线程,其他线程是工作线程。主线程是第一个启动的线程。
父线程:如果线程A中启动了一个线程B,A就是B的父线程 子线程:B就是A的子线程
Python中,构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好
源码:
# 在Thread的__init__方法中
if daemon is not None:
self._daemonic = daemon # 用户设定bool值
else:
self._daemonic = current_thread().daemon
self._ident = None
线程daemon属性,如果设定就是用户的设置,否则就取当前线程的daemon值。
主线程是non-daemon线程,即daemon = False
import time
import threading
def foo():
time.sleep(2)
for i in range(5):
print(i)
# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=False)
t.start()
print('Main Thread Exiting')
# 输出结果
Main Thread Exiting
0
1
2
3
4
发现线程t依然执行,主线程已经执行完,但是一直等着线程t。
修改为t = threading.Thread(target=foo, daemon=True)
后
# 输出结果
Main Thread Exiting
名称 | 含义 |
---|---|
daemon属性 | 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常 |
isDaemon() | 是否是daemon线程 |
setDaemon | 设置为daemon线程,必须在start方法之前设置 |
会发现程序立即结束了,根本没有等线程t。
名称 | 含义 |
---|---|
daemon属性 | 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常 |
isDaemon() | 是否是daemon线程 |
setDaemon | 设置为daemon线程,必须在start方法之前设置 |
总结:
- 线程具有一个daemon属性,可以显示设置为True或False,也可以不设置,则取默认值None。如果不设置daemon,就取当前线程的daemon来设置它。
- 主线程是non-daemon线程,即daemon=False。从主线程创建的所有线程的不设置daemon属性,则默认都是daemon=False,也就是non-daemon线程。
- Python程序在没有活着的non-daemon线程运行时退出,也就是剩下的只能是daemon线程,主线程才能退出,否则主线程就只能等待。
思考下面程序的输出:
import time
import threading
def bar():
time.sleep(5)
print('bar')
def foo():
for i in range(10):
print(i)
t = threading.Thread(target=bar, daemon=False)
t.start()
# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=True)
t.start()
print('Main Thread Exiting')
# 输出结果
0
Main Thread Exiting
可以看到,并不会输出‘bar’这个字符串,进行修改:
time.sleep(2) # 在原先print上面加上这一句
print('Main Thread Exiting')
# 输出结果
0
1
2
3
4
5
6
7
8
9
Main Thread Exiting
bar
可以看到‘bar’字符串打印出来了
再看一个例子,看看主线程什么时候结束daemon线程
# 看看主线程何时结束daemon线程
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
t1 = threading.Thread(target=foo, args=(5,), daemon=True)
t1.start()
t2 = threading.Thread(target=foo, args=(10,), daemon=False)
t2.start()
time.sleep(2)
print('Main Thread Exiting')
# 输出结果
0
0
1
1
Main Thread Exiting
2
2
3
3
4
4
5
6
7
8
9
调换10和5看看效果
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
t1 = threading.Thread(target=foo, args=(10,), daemon=True) # 调换10和20看看效果
t1.start()
t2 = threading.Thread(target=foo, args=(5,), daemon=False)
t2.start()
time.sleep(2)
print('Main Thread Exiting')
# 输出结果
0
0
1
1
Main Thread Exiting
2
2
3
3
4
4
5
上例说明,如果有non-daemon线程的时候,主线程退出时,也不会杀掉所有的daemon线程,直到所有non-daemon线程全部结束,如果还有daemon线程,主线程需要退出,会结束所有daemon线程、退出。
join方法
先看例子
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
t1 = threading.Thread(target=foo, args=(10,), daemon=True)
t1.start()
t1.join()
print("Main Thread Exiting")
# 输出结果
0
1
2
3
4
5
6
7
8
9
Main Thread Exiting
然后取消join方法看一下结果:
# 输出结果
0
Main Thread Exiting
使用了join方法后,daemon线程执行完了,主线程才退出来。
join(timeout=None)
是线程的标准方法之一。一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止。一个线程可以被join多次。
timeout参数指定调用者多久,没有设置超时,就一直等到被调用线程结束。调用谁的join方法,就是join谁,就要等谁。
八、daemon线程应用场景
daemon thread的作用是:当你把一个线程设置为daemon,它会随主线程的退出而退出。
主要应用场景有:
- 后台任务。如:发送心跳包、监控,这种场景最多
- 主线程工作才有用的线程。如主线程中维护这公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作也没有意义了,一起退出最合适。
- 随时可以被终止的线程。如果主线程退出,想所有其他工作线程一起退出,就使用daemon=True来创建工作线程。
比如:开启一个线程定时判断WEB服务是否正常工作,主线程退出,工作线程也应该随着主线程退出一起退出。这种daemon线程一旦创建,就可以忘记它了,只看关系主线程什么时候退出就行了。简言之,daemon线程,简化了程序员手动关闭线程的工作。
- 如果在non-daemon线程A中,对另一个daemon线程B使用了join方法,这个线程B设置成daemon就没有什么意义了,因为non-daemon线程A总是要等待B。
- 如果在一个daemon线程C中,对另一个daemon线程D使用了join方法,只能说明C要等待D,主线程退出,C和D不管是否结束,也不管他们谁等谁,都要被杀掉
例如:
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print("t1's daemon = {}".format(threading.current_thread().isDaemon()))
t2 = threading.Thread(target=bar)
t2.start()
print("t2's daemon = {}".format(t2.isDaemon()))
t1 = threading.Thread(target=foo, daemon=True)
t1.start()
time.sleep(3)
print("main thread exiting")
# 输出结果:
t1's daemon = True
t2's daemon = True
bar
bar
bar
main thread exiting
上例,只要主线程退出,2个工作线程都结束,可以使用join,让线程结束不了。
九、threading.local类
import threading
import time
def worker(): # 局部变量实现
x = 0
for i in range(100):
time.sleep(0.001)
x += 1
print(threading.current_thread(), x)
for i in range(10):
threading.Thread(target=worker).start()
# 输出结果
<Thread(Thread-1, started 7128)> 100
<Thread(Thread-2, started 12008)> 100
<Thread(Thread-3, started 11536)> 100
<Thread(Thread-4, started 9792)> 100
<Thread(Thread-5, started 11608)> 100
<Thread(Thread-7, started 2340)> 100
<Thread(Thread-6, started 12028)> 100
<Thread(Thread-8, started 12204)> 100
<Thread(Thread-9, started 11948)> 100
<Thread(Thread-10, started 11816)> 100
上例使用多线程,每个线程完成不同的计算任务。x是局部变量。
能否改造成使用全局变量完成?
import threading
import time
class A:
def __init__(self):
self.x = 0
global_data = A() # 全局变量
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for i in range(10):
threading.Thread(target=worker).start()
# 输出结果
<Thread(Thread-2, started 11012)> 984
<Thread(Thread-4, started 11512)> 985
<Thread(Thread-3, started 9020)> 987
<Thread(Thread-5, started 9436)> 988
<Thread(Thread-1, started 11660)> 989
<Thread(Thread-7, started 8404)> 990
<Thread(Thread-8, started 7400)> 991
<Thread(Thread-6, started 8236)> 993
<Thread(Thread-10, started 4476)> 994
<Thread(Thread-9, started 11760)> 995
上例虽然使用了全局对象,但是线程之间互相干扰,导致了错误的结果。
能不能使用全局对象,还能保持每个线程使用不同的数据呢?Python提供了threading.local
类,将这个实例得到一个全局对象,但是不同的线程使用这个对象存储的数据其他线程看不见。
import threading
import time
# 全局变量
global_data = threading.local()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for i in range(10):
threading.Thread(target=worker).start()
# 打印结果:
<Thread(Thread-3, started 5220)> 100
<Thread(Thread-7, started 4016)> 100
<Thread(Thread-4, started 11504)> 100
<Thread(Thread-2, started 10376)> 100
<Thread(Thread-1, started 11556)> 100
<Thread(Thread-8, started 8908)> 100
<Thread(Thread-6, started 4752)> 100
<Thread(Thread-10, started 9796)> 100
<Thread(Thread-5, started 10552)> 100
<Thread(Thread-9, started 10480)> 100
可以看到,结果显示和使用局部变量的结果一样。
再看一个threading.local的例子:
# threading.local例子
import threading
X = 'abc'
ctx = threading.local() # 注意这个对象所处的线程
ctx.x = 123
print(ctx, type(ctx), ctx.x)
def worker():
print(X)
print(ctx)
print(ctx.x)
print('working')
worker() # 普通函数调用
print()
threading.Thread(target=worker).start() # 另起一个线程
# 打印结果
<_thread._local object at 0x000000000299E570> <class '_thread._local'> 123
abc
<_thread._local object at 0x000000000299E570>
123
working
abc
<_thread._local object at 0x000000000299E570>
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Users\pc\AppData\Local\Programs\Python\Python36\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\Users\pc\AppData\Local\Programs\Python\Python36\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "F:/PycharmProjects/0524_线程进程.py", line 127, in worker
print(ctx.x)
AttributeError: '_thread._local' object has no attribute 'x'
从运行结果来看,另起一个线程打印ctx.x出错了:
AttributeError: '_thread._local' object has no attribute 'x'
但是,ctx打印没有出错,说明能看到ctx,但是ctx中的x看不到,这个x不能跨线程。
threading.local类构建了一个大字典,其元素是每一线程实例的地址为key和线程对象引用线程单独的字典的映射,如下:
{ id(Thread) -> (ref(Thread), thread-local dict) }
通过threading.local实例就可在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全。
十、定时器Timer/延迟执行
threading.Timer继承自Thread,这个类用来定义多久执行一个函数。
class threading.Timer(interval, function, args=None, kwargs=None)
start方法执行之后,Timer对象会处于等待状态,等待了interval之后,开始执行function函数的。如果在执行函数之前的等待阶段,使用了cancel方法,就会跳过执行函数结束。
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5, worker)
t.setName('w1')
t.start() # 启动线程
print(threading.enumerate())
# 打印结果:
[<_MainThread(MainThread, started 7360)>, <Timer(w1, started 6428)>]
2018-05-27 15:40:13,706 w1 8052 in worker
在上面的基础上加入t.cancel()后:
t.cancel()
time.sleep(1)
print(threading.enumerate())
# 打印结果
[<_MainThread(MainThread, started 6660)>, <Timer(w1, started 7284)>]
[<_MainThread(MainThread, started 6660)>]
如果线程中worker函数已经开始执行,cancel就没有任何效果了。
总结:
Timer是线程Thread的子类,就是线程类,具有线程的能力和特征。
它的实例是能够延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它。
提前cancel:
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5, worker)
t.setName('w1')
t.cancel() # 提前取消
t.start() # 启动线程
print(threading.enumerate())
time.sleep(3)
print(threading.enumerate())
# 打印结果
[<_MainThread(MainThread, started 6684)>]
[<_MainThread(MainThread, started 6684)>]