多线程基础
一个进程中默认有且只有一个线程,叫主线程
默认情况所有代码都是在主线程中执行
进程中主线程以外的线程都叫子线程
1.怎么让进程拥有子线程
在进程中创建threading模块中的Thread类的对象或Thread类的子类对象
2.程序(进程)的结束
只有进程中有的线程都结束了进程才会结束
from threading import *
from datetime import datetime
import time
def download(film_name: str):
print('%s开始下载:%s' % (film_name, datetime.now()))
time.sleep(5)
print('%s下载结束:%s' % (film_name, datetime.now()))
# download('《三体一:地球往事》')
# download('《三体二:黑暗森林》')
# download('《三体三:死神永生》')
1.创建线程对象 - 子线程
"""
线程对象 = Thread(target=需要在子线程中调用的函数,args=元组)
说明:target - 需要在子线程中执行的任务
args - target对应的函数在调用的时候传的参数
"""
t1 = Thread(target=download, args=('《三体一:地球往事》',))
t2 = Thread(target=download, args=('《三体二:黑暗森林》',))
t3 = Thread(target=download, args=('《三体三:死神永生》',))
# 2.让子线程执行子线程中的任务
# 线程对象.start() - 在子线程中调用target对应的函数,并且将args中的元素作为参数
# 线程开始之后只有自然死亡或者异常两种情况能终止
t1.start()
t2.start()
t3.start()
from threading import *
from datetime import datetime
import time
class DownloadThread(Thread):
def __init__(self, film_name):
super().__init__()
self.film_name = film_name
# 这个run方法是会在子线程中自动调用的方法,要求除了self以外不能有其他参数
def run(self) -> None:
print('%s开始下载%s' % (self.film_name, datetime.now()))
time.sleep(5)
print('%s结束下载%s' % (self.film_name, datetime.now()))
# 在子线程中执行下载操作
t1 = DownloadThread('电影1')
t2 = DownloadThread('电影2')
t3 = DownloadThread('电影3')
t1.start()
t2.start()
t3.start()
多线程
Thread Process
线程和进程对⽐
使⽤线程的⽅式不能很好的使⽤多核cpu的能⼒
from random import randint
from threading import Thread
results = []
def compute():
# 计算100万个1-100之间的随机数的和
total = sum([randint(1, 100) for i in range(1000000)])
results.append(total)
def main():
# 生成8个线程对象
threads = [Thread(target=compute) for i in range(20)]
# 启动多线程
for thread in threads:
thread.start()
# 主线程等待所有子线程执行完毕
for thread in threads:
thread.join()
# 多线程是共享进程里面的内存空间
print(results)
if __name__ == '__main__':
main()
进程池
from multiprocessing import Pool
from random import randint
def compute(n):
# 计算100万个1-100之间的随机数的和
total = sum([randint(1, 100) for i in range(1000000)])
return total
def main():
# 构建了一个进程池,进程数量是固定得
pool = Pool(processes=8)
result = pool.map(compute, range(8))
print(result)
if __name__ == '__main__':
main()
多进程
Python中的多线程⽆法利⽤多核优势 , 所以如果我们想要充分地使⽤多核CPU的资源 , 那么就只能靠多
进程了
multiprocessing模块中提供了Process , Queue , Pipe , Lock , RLock , Event , Condition等组件 , 与
threading模块有很多相似之处
from multiprocessing import Process
from random import randint
results = []
def compute():
# 计算100万个1-100之间的随机数的和
total = sum([randint(1, 100) for i in range(1000000)])
results.append(total)
def main():
processes = [Process(target=compute) for _ in range(8)]
for process in processes:
process.start()
for process in processes:
process.join()
# 每个进程有自己独立的内存空间,进程间不共享内存
# 如果要共享的话,要做多进程通信
print(results)
if __name__ == '__main__':
main()
进程队列/Queue
不同进程间内存是不共享的,要想实现两个进程间的数据交换。进程间通信有两种主要形式 , 队列和管
道
from multiprocessing import Process, Queue
import time
def f(q):
# 子进程往队列里面放东西
print('进入子进程')
time.sleep(3)
q.put('two')
print('出子进程')
def main():
# 实现一个支持进程通信的队列
q = Queue()
# 父进程往队列里放东西
q.put('one')
# 起一个子进程,让它执行f方法,并且把队列给他,args就是队列参数
process = Process(target=f, args=(q,))
process.start()
process.join()
# 非阻塞从队列里取东西,不会等待,如果没有东西会报错
print(q.get_nowait())
print(q.get_nowait())
# 阻塞等待,一直等到有东西才返回
# print(q.get())
# print(q.get())
if __name__ == '__main__':
main()
from multiprocessing import Process, Queue
from random import randint
def compute(q):
# 计算100万个1-100之间的随机数的和
total = sum([randint(1, 100) for i in range(1000000)])
q.put(total)
def main():
q = Queue()
processes = [Process(target=compute, args=(q,)) for i in range(8)]
for process in processes:
process.start()
for process in processes:
process.join()
while not q.empty():
print(q.get_nowait(), end=' ')
if __name__ == '__main__':
main()
管道/Pipe
The Pipe() function returns a pair of connection objects connected by a pipe which by default is
duplex (two-way).
from multiprocessing import Process, Pipe
def f(c_pipe):
print(c_pipe.recv())
print(c_pipe.recv())
c_pipe.send('儿子吐水1')
def main():
p_pipe, c_pipe = Pipe()
process = Process(target=f, args=(c_pipe,))
p_pipe.send('吐水1次')
p_pipe.send('吐水2次')
process.start()
print(p_pipe.recv())
print(p_pipe.recv())
if __name__ == '__main__':
main()
Manager
进程之间是相互独⽴的 ,Queue和pipe只是实现了数据交互,并没实现数据共享,Manager可以实现进
程间数据共享
Manager还⽀持进程中的很多操作 , ⽐如Condition , Lock , Namespace , Queue , RLock , Semaphore
等
from multiprocessing import Process, Manager
from random import randint
import os
def compute(result_dict):
# 计算100万个1-100之间的随机数的和
total = sum([randint(1, 100) for i in range(1000000)])
result_dict[os.getpid()] = total
def main():
# 创建一个Manager上下文环境
with Manager() as manager:
result_dict = manager.dict()
processes = [Process(target=compute, args=(result_dict,)) for i in range(8)]
for process in processes:
process.start()
for process in processes:
process.join()
print(result_dict)
if __name__ == '__main__':
main()
进程锁
from multiprocessing import Process, Lock
def f(i):
# 获取锁
# lock.acquire()
print(i)
# lock.release()
def main():
lock = Lock()
for i in range(100):
# lock.acquire()
process = Process(target=f, args=(i,))
process.start()
# lock.release()
process.join()
if __name__ == '__main__':
main()
进程池apply/apply_async
进程池内部维护⼀个进程序列,当使⽤时,则去进程池中获取⼀个进程,如果进程池序列中没有可供使
⽤的进程,那么程序就会等待,直到进程池中有可⽤进程为⽌。
进程池中有以下⼏个主要⽅法:
- apply:从进程池⾥取⼀个进程并执⾏
- apply_async:apply的异步版本
- terminate:⽴刻关闭进程池
- join:主进程等待所有⼦进程执⾏完毕,必须在close或terminate之后
- close:等待所有进程结束后,才关闭进程池
import os
import time
from multiprocessing import Pool
def f(i):
time.sleep(1)
# 打印进程号
print('子进程:', os.getpid())
return i + 100
def g(n):
print(n)
print('回调函数:', os.getpid())
def main():
pool = Pool(processes=3)
print('主进程:', os.getpid())
# pool.apply() 同步执行
# 异步执行
# 回调函数是主进程在处理
for i in range(5):
pool.apply_async(func=f, args=(i,), callback=g)
pool.close()
pool.join()
if __name__ == '__main__':
main()
协程greenlet
greenlet是⼀个⽤C实现的协程模块,相⽐与python⾃带的yield,它可以使你在任意函数之间随意切
换,⽽不需把这个函数先声明为generator
from greenlet import greenlet
def f1():
print(11)
g2.switch()
print(12)
g2.switch()
def f2():
print(21)
g1.switch()
print(22)
g1 = greenlet(f1)
g2 = greenlet(f2)
g1.switch()
Gevent
Gevent 是⼀个第三⽅库,可以轻松通过gevent实现并发同步或异步编程,在gevent中⽤到的主要模式
是Greenlet, 它是以C扩展模块形式接⼊Python的轻量级协程。
import time
import gevent
def f1():
print('start f1')
gevent.sleep(10)
print('end f1')
def f2():
print('start f2')
print('end f2')
def f3():
print('start f3')
gevent.sleep(20)
print('end f3')
def main():
gevent.joinall([gevent.spawn(f1), gevent.spawn(f2), gevent.spawn(f3)])
if __name__ == '__main__':
main()