1 递归锁Rlock()和互斥锁
递归锁
代码块
from threading import Thread,RLock,Lock
import time
def func(r,i):
r.acquire() #三个acquire()不是并列,是内外嵌套,第一层,在同一个线程内,递归锁可以无止尽的acquire,但是互斥锁不行
r.acquire() #第一次里面的第二层
r.acquire() #第二层里面的最里层
print(i)
r.release() #释放最里层
r.release() #释放中间
r.release() #释放最外层,三层全部释放了,才能开始下一个线程。
#在不同的线程内,递归锁是保证只能被一个线程拿到钥匙,然后无止尽的acquire,其他线程等待,只有前面的release()全部的acquire()才能开始
r=RLock() #开启一个递归锁
for i in range(10):
Thread(target=func,args=(r,i,)).start() #开启10个线程
互斥锁
代码块
from threading import Thread,Lock #导入线程,递归锁
import time
def func(i,l):
l.acquire() #互斥锁只能有一个acquire()和一个release()
time.sleep(0.5)
print(i)
l.release()
l = Lock()
for i in range(10):
Thread(target=func,args=(i,l)).start()
2 条件Condition
代码块
条件 涉及 4个方法:
con.acquire()
con.release()
con.wait() # 假设有一个初始状态为False,阻塞。一旦接受到notify的信号后,变为True,不再阻塞
con.notify(int) 给wait发信号,发int个信号,会传递给int个wait,让int个线程正常执行
条件的例子
代码块
from threading import Condition,Thread
import time
def func(con,i):
con.acquire()# 主线程和10个子线程都在抢夺递归锁的一把钥匙。
# 如果主线程抢到钥匙,主线程执行while 1,input,然后notify发信号,还钥匙。但是,此时如果主线程执行特别快
# 极有可能接下来主线程又会拿到钥匙,那么此时哪怕其他10个子线程的wait接收到信号,但是因为没有拿到钥匙,所以其他子线程还是不会执行
con.wait()
print('第%s个线程执行了'%i)
con.release()
con = Condition()
for i in range(10):
t = Thread(target=func,args = (con,i))
t.start() #开启10个线程
while 1:
# print(123)
con.acquire()
num = input('>>>')
con.notify(int(num))
con.release()
time.sleep(0.5) # 为什么要加这个time.sleep(0.5) 主线程和10个子线程都在抢夺递归锁的一把钥匙。
# 如果主线程抢到钥匙,主线程执行while 1,input,然后notify发信号,还钥匙。但是,此时如果主线程执行特别快
# 极有可能接下来主线程又会拿到钥匙,那么此时哪怕其他10个子线程的wait接收到信号,但是因为没有拿到钥匙,所以其他子线程还是不会执行
#所以没有这个time.sleep(0.5) 子线程是不会拿到钥匙的。
3 守护进程和守护线程与父进程代码
守护线程
守护线程不是随着父线程的代码执行结束而结束,守护线程是随着父线程的执行结束而结束
代码块
from threading import Thread
from multiprocessing import Process
import time
def func_daemon():
time.sleep(3)
print('这是守护线程')
def func():
time.sleep(5)
print('这是普通线程')
# 守护进程是随着父进程的代码执行结束而结束
if __name__ == '__main__':
t = Thread(target=func_daemon,)
t.daemon = True #设置为守护
t.start()
t1 = Thread(target=func, )
t1.start()
print('这里是父线程')
time.sleep(20)
守护进程是随着父进程的代码执行结束而结束
代码块
from threading import Thread
from multiprocessing import Process
import time
def func_daemon():
# time.sleep(3)
print('这是守护进程')
def func():
time.sleep(2)
print('这是普通进程')
if __name__ == '__main__':
p = Process(target=func_daemon, )
p.daemon = True
p.start()
p1 = Process(target=func, )
p1.start()
print('这是父进程')#如果这里没有时间延迟,根本不会执行守护进程,因为代码已经结束了,虽然主进程还在等待普通进程。
4 线程池
4-1多任务提交
代码块
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i
print(sum)
t = ThreadPoolExecutor(20)
start = time.time()
t.map(func,range(1000))# 提交多个任务给池中。 等效于 for + submit
t.shutdown()
print(time.time() - start)
4-2 回调函数
代码块
from concurrent.futures import ProcessPoolExecutor
# 不管是ProcessPoolExecutor的进程池 还是Pool的进程池,回调函数都是父进程调用的。
import os
import requests
def func(num):
sum = 0
for i in range(num):
sum += i
return sum #返回一个sum,回调函数接收
def call_back_fun(res):
print(res.result(),os.getpid())
# print(os.getpid())
if __name__ == '__main__':
print(os.getpid()) #父进程的进程号
t = ProcessPoolExecutor(20)
for i in range(1000):
t.submit(func,i).add_done_callback(call_back_fun) #指明是哪一个回调函数
t.shutdown()
4-3 怎样拿到线程池的返回值
第一钟方法:
代码块
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i
return sum
t = ThreadPoolExecutor(20)
# 下列代码是用map的方式提交多个任务,对应 拿结果的方法是__next__() 返回的是一个生成器对象
res = t.map(func,range(1000))
t.shutdown()
print(res.__next__())
print(res.__next__())
print(res.__next__())
print(res.__next__())
第二种方法:
代码块
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
sum = 0
for i in range(num):
sum += i
return sum
t = ThreadPoolExecutor(20)
# 下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result
res_l = []
for i in range(1000):
re = t.submit(func,i)
res_l.append(re)
t.shutdown()
# print(i.result() for i in res_l)
[print(i.result()) for i in res_l]
# 在Pool进程池中拿结果,是用get方法。 在ThreadPoolExecutor里边拿结果是用result方法
5 进程池,多进程,多线程处理任务的效率快慢对比
5-1进程池
代码块
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
# concurrent.futures 这个模块是异步调用的机制
# concurrent.futures 提交任务都是用submit
# for + submit 多个任务的提交
# shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。
# from multiprocessing import Pool.apply / apply_async
import time
def func(num):
sum = 0
for i in range(num):
for j in range(i):
for x in range(j):
sum += x
print(sum)
if __name__ == '__main__':
# pool的进程池的效率演示
p = Pool(5)
start = time.time()
for i in range(100):
p.apply_async(func,args=(i,)) #异步
p.close()
p.join()
print('Pool进程池的效率时间是%s'%(time.time() - start))
#0.47秒用时
5-2 多进程
代码块
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
# concurrent.futures 这个模块是异步调用的机制
# concurrent.futures 提交任务都是用submit
# for + submit 多个任务的提交
# shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。
# from multiprocessing import Pool.apply / apply_async
import time
def func(num):
sum = 0
for i in range(num):
for j in range(i):
for x in range(j):
sum += x
print(sum)
if __name__ == '__main__':
# 多进程的效率演示
tp = ProcessPoolExecutor(5)
start = time.time()
for i in range(100):
tp.submit(func, i)
tp.shutdown() # 等效于 进程池中的 close + join
print('进程池的消耗时间为%s' % (time.time() - start))
#用时0.56秒
5-3 多线程
代码块
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
# concurrent.futures 这个模块是异步调用的机制
# concurrent.futures 提交任务都是用submit
# for + submit 多个任务的提交
# shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。
# from multiprocessing import Pool.apply / apply_async
import time
def func(num):
sum = 0
for i in range(num):
for j in range(i):
for x in range(j):
sum += x
print(sum)
if __name__ == '__main__':
# 多线程的效率
tp = ThreadPoolExecutor(20)
start = time.time()
for i in range(100):
tp.submit(func,i)
tp.shutdown()# 等效于 进程池中的 close + join
print('线程池的消耗时间为%s'%(time.time() - start))
#用时0.42
5-4结论
针对计算密集的程序来说
不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率差不多一样
ThreadPoolExecutor 的效率要差很多
所以 当计算密集时,使用多进程。
针对IO等阻塞的程序来说,使用线程切换的速度快,所以使用多线程。
别跑,点个赞再走