concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:executor.submit()
concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:executor.submit
import concurrent.futures
import time
nlist=[1,2,3,4,5,6,7,8,9,10]
def evaitem(x):
rs=count(x)
print("item "+str(x)+" reuslt"+str(rs))
def count(x):
for i in range(0,10000000):
i=i+1
return i*x
if name=="main":
stime=time.clock()
for item in nlist:
evaitem(item)
print("sequential execution in "+str(time.clock()-stime),"seconds")
stime1=time.clock()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for item in nlist:
executor.submit(evaitem,item)
print("thread execution in "+str(time.clock()-stime1),"seconds")
stime2 = time.clock()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for item in nlist:
executor.submit(evaitem, item)
print("process execution in " + str(time.clock() - stime2), "seconds")
--------------------------------------------concurrent.futures.wait
阻塞主线程:ALL_COMPLETE,FIRST_COMPLETE,FIRST_EXCEPTION
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list))
print('主线程')
运行结果:
'http://www.163.com' page is 662047 bytes
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 54629 bytes
DoneAndNotDoneFutures(done={<Future at 0x2d0f898 state=finished returned NoneType>, <Future at 0x2bd0630 state=finished returned NoneType>, <Future at 0x2d27470 state=finished returned NoneType>}, not_done=set())
主线程
shutdown 相当于close+join
add_done_callback()
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import time,os
def get_page(url):
print('<%s> is getting [%s]'%(os.getpid(),url))
response = requests.get(url)
if response.status_code==200: #200代表状态:下载成功了
return {'url':url,'text':response.text}
def parse_page(res):
res = res.result()
print('<%s> is getting [%s]'%(os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))
f.write(parse_res)
if name == 'main':
# p = ThreadPoolExecutor()
p = ProcessPoolExecutor()
l = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]
for url in l:
res = p.submit(get_page,url).add_done_callback(parse_page) #这里的回调函数拿到的是一个对象。得
# 先把返回的res得到一个结果。即在前面加上一个res.result() #谁好了谁去掉回调函数
# 回调函数也是一种编程思想。不仅开线程池用,开线程池也用
p.shutdown() #相当于进程池里的close和join
print('主',os.getpid())