我们知道现今的操作系统都支持“多任务”(Multitasking),虽然计算机的中央处理器(CPU)在同一时刻只能运行一个程序(双核心的话就是两个),但是由于CPU的速度极快,每秒能执行几十亿条机器语言指令,因此系统可以划分出微秒级的时间片,通过合理的任务调度快速切换执行程序,从人类的角度看就是在同时运行了。
对于操作系统来说,任务调度和资源分配的基本单元是“进程”(Process),不同程序在不同进程中运行。同一程序也可能会启动多个进程以高效地执行多任务,例如浏览器同时下载多个在线资源,IDE一边接受代码输入一边调用解释器检查语法等等——任务切换时需要记住各自进行到了哪一步,这种信息就称为“上下文”(Context)。在Windows系统中右击任务栏选择“任务管理器”即可显示当前进程列表,你会发现即使未打开任何应用,也有上百个进程正在后台运行。
之前我们编写的代码都是单路运行的,例如以下程序依次执行了两个工作任务:
"""xwork_1.py 主程序依次执行多个任务
"""
import time
def work(tasknum):
t1 = time.perf_counter()
print(f"任务{tasknum}开始……")
time.sleep(3)
print(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。")
def main():
work(1)
work(2)
if __name__ == "__main__":
t1 = time.perf_counter()
main()
print(f"主程序耗时{time.perf_counter() - t1}秒。")
任务函数work()用time.sleep()模拟耗时3秒的操作,这样依次执行两个任务的总耗时就是6秒:
PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_1.py"
任务1开始……
任务1完成!耗时3.0185625689999998秒。
任务2开始……
任务2完成!耗时3.0008498049999996秒。
主程序耗时6.020425788秒。
多个耗时操作如果彼此没有关联,就可以通过“并发”(Concurrent)来避免无谓的等待。下面就让我们尝试编写并发执行多个任务的程序——Python标准库提供了multiprocessing模块来实现多进程,你可以调用进程类构造器multiprocessing.Process()创建进程实例,再调用实例的start()方法启动之,这样任务就能在不同进程中并发执行了:
"""xwork_2.py 多个进程并发执行多个任务
"""
import time
import multiprocessing
def work(tasknum):
t1 = time.perf_counter()
print(f"任务{tasknum}开始……")
time.sleep(3)
print(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。")
def main():
multiprocessing.Process(target=work, args=(1,)).start()
multiprocessing.Process(target=work, args=(2,)).start()
if __name__ == "__main__":
t1 = time.perf_counter()
main()
print(f"主程序耗时{time.perf_counter() - t1}秒。")
以上程序用两个进程并发执行两个任务,在3秒之后同时完成,而在默认进程中执行的主程序因为没有耗时操作所以率先结束了(如果你希望等其它进程都完成再退出主程序,可以在进程启动后再调用实例的join()方法):
PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_2.py"
主程序耗时0.06413474699999999秒。
任务1开始……
任务2开始……
任务1完成!耗时2.993581393秒。
任务2完成!耗时2.993595187秒。
在单个进程内部也可以同时运行多个子任务,称为“线程”(Thread),线程相比进程更为轻量,建立和释放速度更快——通常耗时操作可分为两种:例如密码破解需要CPU进行大量运算,这称为CPU密集型应用;而网络爬虫主要处理数据的输入和输出(Input/Output),这称为IO密集型应用。前者宜采用多进程,后者则宜采用多线程。使用线程要引入标准库的threading模块,具体写法与使用进程类似:
"""xwork_3.py 多个线程并发执行多个任务
"""
import time
import threading
import sys
def work(tasknum):
t1 = time.perf_counter()
sys.stdout.write(f"任务{tasknum}开始……\n")
time.sleep(3)
sys.stdout.write(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。\n")
def main():
threading.Thread(target=work, args=(1,)).start()
threading.Thread(target=work, args=(2,)).start()
if __name__ == "__main__":
t1 = time.perf_counter()
main()
print(f"主程序耗时{time.perf_counter() - t1}秒。")
使用多线程要注意所谓“线程安全”问题——例如当多个线程都想用print()输出信息时,可能会因为没有抢到资源而出现异常。因此以上程序的工作函数输出信息用的是线程安全的sys.stdout.write(),运行结果如下:
PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_3.py"
任务1开始……
任务2开始……
主程序耗时0.0009290429999999975秒。
任务2完成!耗时3.008762067秒。
任务1完成!耗时3.0092474399999998秒。
Python 3.4新增了一种更适合IO密集型应用的特性“异步IO”(Asynchronous I/O),在不开多进程或多线程的情况下也能实现多任务并发。简单来说,当程序发起一个普通IO操作时,它会“阻塞”(Block)当前任务的执行直到操作结束,而所谓异步IO就是不等待IO操作结束就继续执行,这需要创建一个内部事件循环来轮番处理所有任务的状态。异步IO调度任务的基本单元称为“协程”(Coroutine)——函数之类的程序构件可统称为子程序或“例程”(Routine),一般都是从起点进入从终点退出;而协程是一种特殊例程,在进入后可以多次中断转往其他操作再返回(其实就是之前介绍过的生成器),这样就能有任意多个任务在事件循环中被切换执行了。
通过异步IO实现多任务并发需要引入标准库的asyncio模块并使用async/await语句:
- async def 定义协程函数用来返回协程对象/async with 指定异步上下文管理器用来生成可等待对象
- asyncio.create_task()/.gather() 将一个/多个协程打包为任务排入计划日程
- await 指定任务或其他可等待对象等待其完成并返回执行结果
- 在主程序中获取事件循环来运行作为顶层入口的协程函数
具体写法如下所示:
"""xwork_4.py 多个协程并发执行多个任务
"""
import time
import asyncio
async def work(tasknum):
t1 = time.perf_counter()
print(f"任务{tasknum}开始……")
await asyncio.sleep(3)
print(f"任务{tasknum}完成!耗时{time.perf_counter() - t1}秒。")
async def main():
tasks = asyncio.gather(work(1), work(2))
await tasks
if __name__ == "__main__":
t1 = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 在Spyder中用下面这句才行,因为IPyhon已启动事件循环
# asyncio.run_coroutine_threadsafe(main(), loop)
# 在Python 3.7中用下面这句即可,不必再去获取事件循环
# asyncio.run(main())
print(f"主程序耗时{time.perf_counter() - t1}秒。")
请注意协程版work()要用asyncio.sleep()来模拟异步IO耗时操作,因为time.sleep()会阻塞事件循环。程序运行结果如下:
PS D:\Test\pyStudy> python -u "d:\Test\pyStudy\basic\xwork_4.py"
任务1开始……
任务2开始……
任务1完成!耗时3.0036561820000003秒。
任务2完成!耗时3.0038537950000004秒。
主程序耗时3.019819151秒。
下面的示例是之前百度图片爬虫的协程版,程序比原来复杂一点但性能提升了许多倍——请注意urllib.request同样也是阻塞式的,所以要改用基于异步IO的第三方包aiohttp:
"""xwork_webcrawler.py 协程版百度图片搜索并批量下载
"""
import asyncio
import aiohttp
from urllib.parse import quote
import os
import re
import time
url = "https://image.baidu.com/search/flip?tn=baiduimage&word="
keyword = "CG原画"
folder = "img"
path = os.path.abspath(".")
headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) \
Gecko/20100101 Firefox/61.0'
}
async def fetch(session, url):
async with session.get(url, timeout=20) as res:
return await res.text()
async def store(session, url):
t1 = time.perf_counter()
async with session.get(url, timeout=20) as res:
with open(os.path.join(path, folder, url.split("/")[-1]), "wb") as f:
while True:
chunk = await res.content.read(512)
if not chunk:
break
f.write(chunk)
print(f"保存图片{url}耗时{time.perf_counter() - t1}秒。")
async def main():
if not os.path.exists(folder):
os.mkdir(folder)
async with aiohttp.ClientSession(headers=headers) as session:
t1 = time.perf_counter()
html = await fetch(session, url + quote(keyword))
links = re.findall(r'"objURL":"(.+?)"', html)
print(f"提取文本耗时{time.perf_counter() - t1}秒。")
for i in links:
await asyncio.create_task(store(session, i))
if __name__ == "__main__":
t1 = time.perf_counter()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except Exception as e:
print(repr(e))
print(f"主程序耗时{time.perf_counter() - t1}秒。")
你还可以尝试配合使用协程和进程,在发挥异步IO执行效率的同时充分利用CPU的多个核心。
——编程原来是这样……
编程小提示:源码管理
当你学到这里,可以算是完全入门了,但在开始实际编程之前,你还应当了解“源码管理”(Source Code Management,简称SCM)或者叫“版本管理”因为它适用于任何需要保留修改记录的项目——最流行的源码管理工具是Git,官网下载页 https://git-scm.com/downloads
要学习Git直接看官方文档就好,有中文版 https://git-scm.com/book/zh/v2
你还可以加装一个图形界面的外壳,这样就无需记住Git命令:
- Windows下推荐TortoiseGit https://tortoisegit.org/
- 其他系统推荐Git-Cola https://git-cola.github.io/
许多网站提供免费Git服务,最大的一家是GitHub(https://github.com/),用国内的站点例如“码云”(https://gitee.com/)网速会快些——本教程的源码就放在这里,使用以下Git命令即可克隆到本机:
git clone https://gitee.com/freesand/pyStudy.git
如果你用VSCode,推荐安装这个插件GitLens: