系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
这里我把第二段话括起来了;因为第二段很关键;其实你真的明白了第二段话,就明白了什么是线程池;我们平时使用线程,是把这个函数启动;函数结束也就意味着线程的结束;但显然线程池不能这么干;为什么呢?如果你频繁的启用线程,关闭线程;那么效率会很低;所以线程池起了很多个线程;我们把函数附进去;即使这个函数运行结束了,我们也不会让这个线程消亡;就是线程在那里空转;但是函数已经是别的函数了;这才是线程池的精髓;
显然,python不会像C一样;要自己去写线程池;自然是有写好的模块;那么我们看看如何使用线程池;
=============================================================
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor 提供了如下常用方法:
- submit(fn, *args, kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池。
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。
Future 提供了如下方法:
- cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
- cancelled():返回 Future 代表的线程任务是否被成功取消。
- running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
- done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
- result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
- exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
- add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。
在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
- 定义一个普通函数作为线程任务。
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max,min,**kwargs):
my_sum = 0
for i in range(max):
#print(threading.current_thread().name + ' ' + str(i))
my_sum += 1
for key in kwargs.keys():
my_sum+=kwargs[key]
return my_sum-min
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50,10,middle = 5)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100,10,middle = 5)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()
这个case其实是我在网上找的,但是我润色了一下;那里润色了呢?就是我现在用的是多个参数的函数;这里面既有普通的参数,也有关键字参数;我觉得可以注意一下pool.submit的参数形式是什么样子的;
这里我想说的还有一点,要看的是现在pool的基本函数
一个创建类;用的是
pool = ThreadPoolExecutor(max_workers=2)
一个提交任务;用的是
pool.submit()
一个是关闭线程池;用的是
pool.shutdown()
然后就是提交了以后返回一个类为future(名字可以自己随便起),然后有done,result等方法;其中done是立即返回的,result是阻塞等待的;
==============================================================
下面我们要看看,就是其中result是一个阻塞等待的函数;如果我们不想阻塞等待,该怎么办呢?future(也就是提交以后返回的类)存在一个回调,add_done_callback,就是在这个回调函数;
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')
首先要明白一点,什么是回调函数;之前在用C的时候,那个时候我刚刚接触;感觉怎么也绕不回来;什么是回调?回调有啥用;为啥我想想那个时候绕不出来,是因为我自己当时第一次用的时候,是公司的框架代码中用的;那个时候框架说实话,一个是比较大,代码较为分散;一个是代码也不是很正规(几个所谓的领导自己闭门造车出来的框架);导致我没有太弄清楚回调的作用;所以我这里说了这么多废话,想告诫自己也是在座同仁,知识一定要从最基础,最正规的地方开始学起,用熟了以后才能熟练应用到各种场景;还有就是多总结;省的以后忘记了;
那么话说完;什么是回调?回调有啥用;这里我就总结一点最正规的用法;就是回调用做获取线程运行的某些结果;这就是最常用的用法;为什么这么说呢?先看看什么是回调;比如有两个函数,a,b;b是a的一个参数;b函数呢,有一些自己的定义的变量,内部的;此时a想要获取b的这些自定义的参数,显然,没有权限;那么怎么办呢?比较好的办法,b在函数体内,调用了a;把这些自定义的值,给了a;那么a就获取到了;就很方便的传出来了;
那么再想想;如果a,b在同一个线程里,顺序执行;b直接return结果就好了;不需要这么麻烦;所以说,当a,b不再一个线程的时候;这个时候优势就体现出来了;要想获取b的结果;就要定义个回调函数,传进去给b,b运行时候调用;就很方便的把结果传出来了;
关于回调,还有一个比较常用的方法;就是当存在库的时候;因为此时我们无法获取库的代码;库提供一些接口,我们想获取他的一些自己的值怎么办呢?库接口的参数此时应该提供一个回调函数的type,我们把回调函数赋进去,就可以很方便的获取到了
==========================================================
再看看map的使用
此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ' ' + str(i))
my_sum += i
return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面元组有3个元素,因此程序启动3条线程来执行action函数
results = pool.map(action, (50, 100, 150))
print('--------------')
for r in results:
print(r)
上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。
运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。
通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致。也就是说,上面 results 的第一个元素是 action(50) 的结果,第二个元素是 action(100) 的结果,第三个元素是 action(150) 的结果。
这里我试了半天,也没有好的办法传递多个参数,对于map,所以先这么用吧;后面有用到可以再说;
Thread 类有一个 Timer子类,该子类可用于控制指定函数在特定时间内执行一次。例如如下程序:
from threading import Timer
def hello(x):
print("hello, world"+x)
# 指定10秒后执行hello函数
t = Timer(3.0, hello,args = ("www",))
t.start()
这里Timer里面和Thread一样,但是Target不用使用,后面非关键字的参数也是args,关键字的参数也是kwargs;
使用 Timer 定时器有一个弊端,即只能控制线程在指定时间内执行一次任务,如果想实现每隔一段时间就执行一次,需要借助循环结构。
实际上,python还提供有一个更强大的、可用来定义执行任务调度的 sched 模块,该模块中含有一个 scheduler 类,可用来执行更复杂的任务调度。
scheduler 类常用的构造方法如下:
scheduler(timefunc=time.monotonic, delayfunc=time.sleep)
可以向该构造方法中传入 2 个参数(当然也可以不提供,因为都有默认值),分别表示的含义如下:
- timefunc:指定生成时间戳的函数,默认使用 time.monotonic 来生成时间戳;
- delayfunc:在未到达指定时间前,通过该参数可以指定阻塞任务执行的函数,默认采用 time.sleep() 函数来阻塞程序。
另外,scheduler 类中还提供有一些方法,表 1 罗列了常用的一些。
import threading
from sched import scheduler
def action(arg):
print(arg)
#定义线程要调用的方法,*add可接收多个以非关键字方式传入的参数
def thread_action(*add):
#创建任务调度对象
sche = scheduler()
#定义优先级
i = 3
for arc in add:
# 指定1秒后执行action函数
sche.enter(1, i, action,argument=(arc,))
i = i - 1
#执行所有调度的任务
sche.run()
#定义为线程方法传入的参数
my_tuple = ("http://c.biancheng.net/python/",\
"http://c.biancheng.net/shell/",\
"http://c.biancheng.net/java/")
#创建线程
thread = threading.Thread(target = thread_action,args =my_tuple)
#启动线程
thread.start()
程序结果
http://c.biancheng.net/java/
http://c.biancheng.net/shell/
http://c.biancheng.net/python/
注意,以上输出结果是在执行程序 1 秒后逐个输出的。
上面程序中,创建了 thread 子线程去执行 thread_action() 函数,在该函数中使用 scheduler 类调度了 3 个任务,这 3 个任务都指定的是 1 秒后执行,其优先级分别为 3、2、1。
由于是在同一时间执行这 3 个任务,因此优先级的设定决定了谁先执行、谁后执行。显然优先级为 1 的任务优先执行,优先级为 3 的最后执行。因此上面程序执行结果中字符串的输出顺序恰好和 my_tuple 元组中的顺序是相反的。
这里面需要注意的是,sche.run是阻塞的方式;但是没关系;我们是起了线程thread;所以阻塞无所谓,还是并行执行的;