@(python)
目录
引言
Executor和Future
使用submit来操作线程池/进程池
add_done_callback实现回调函数
引言
Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。
Executor和Future
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future这个概念相信有java和nodejs下编程经验的朋友肯定不陌生了,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
p.s:如果你依然在坚守Python2.x,请先安装futures模块。
pip install futures
使用submit来操作线程池/进程池
我们先通过下面这段代码来了解一下线程池的概念:
# example1.py 
from concurrent.futures import ThreadPoolExecutor 
import time 
def return_future_result(message): 
    time.sleep(2) 
    return message 
pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池 
future1 = pool.submit(return_future_result, ("hello"))  # 往线程池里面加入一个task 
future2 = pool.submit(return_future_result, ("world"))  # 往线程池里面加入一个task 
print(future1.done())  # 判断task1是否结束 
time.sleep(3) 
print(future2.done())  # 判断task2是否结束 
print(future1.result())  # 查看task1返回的结果 
print(future2.result())  # 查看task2返回的结果 
执行结果:
False 
True 
hello 
world 
解析:我们根据运行结果来分析一下。我们使用
submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。在第一个print语句中很明显因为time.sleep(2)的原因我们的future1没有完成,因为我们使用time.sleep(3)暂停了主线程,所以到第二个print语句的时候我们线程池里的任务都已经全部结束。
上面的代码我们也可以改写为进程池形式,api和线程池如出一辙,我就不罗嗦了。
# example2.py 
from concurrent.futures import ProcessPoolExecutor 
import time 
def return_future_result(message): 
    time.sleep(2) 
    return message 
pool = ProcessPoolExecutor(max_workers=2) 
future1 = pool.submit(return_future_result, ("hello")) 
future2 = pool.submit(return_future_result, ("world")) 
print(future1.done()) 
time.sleep(3) 
print(future2.done()) 
print(future1.result()) 
print(future2.result()) 
add_done_callback实现回调函数
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response
def download(futures):
    response = futures.result()  #会得到一个返回值,这个返回值就是task函数的返回值
    content = response.text
    tmp_list = response.url.split("/")
    filename = tmp_list[len(tmp_list)-1]
    print("正在下载:%s" %response.url)
    with open(filename,"w",encoding="utf-8") as f:
        f.write("%s\n%s" %(response.url,content))
        print("下载完成")
url_list = [
    "http://www.cnblogs.com/wupeiqi/articles/5713330.html",
    "http://blog.csdn.net/anzhsoft/article/details/19563091",
    "http://blog.csdn.net/anzhsoft/article/details/19570187"
]
thread_pool = ThreadPoolExecutor(max_workers=2) #生一个线程池对象,最大线程数为2个
for url in url_list:
    futures = thread_pool.submit(task,url)  #会得到一个Future对象
    #回调函数,会将futures本身当作参数传给download函数
    futures.add_done_callback(download)