在上节中,我们成功的在多进程中利用协程实现了多任务异步执行和多流程按次序执行的目标。本节我们将在原有代码的基础上继续改造代码,增加网页请求功能,实现一个简单的异步爬虫,实现每次爬新网页只需要关注网络请求、网页解析和数据处理,多进程和异步请求部分由爬虫自身处理。
详细流程图

需要用到的库
Beautifulsoup:一个可以从 HTML 或 XML 文件中提取数据的Python库。
# 安装方法
cd AiospiderWorkshop
pipenv shell
pipenv install beautifulsoup4
创建下载类 Downloader
我们以崔庆才崔老师建立的爬虫练习网站 https://scrape.center/ 为练习对象。我们用到的是其中最简单的一个网页 https://ssr1.scrape.center/page/1。阅读本节需要对 Beautifulsoup 库和 aiohttp 库有简单了解。
新建一个 py 文件,验证下载类 Downloader 的功能。
- 建立一个函数备用,从网页抽取电影名并打印到屏幕上。
from bs4 import BeautifulSoup
def extract_movie_name(html):
soup = BeautifulSoup(html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
- 创建下载类
Downloader
Downloader 类主要有两个方法 get_async、download。
download:打开一个 session,异步请求 url 列表中的所有 url。
get_async:请求网页并返回网页 html。
import asyncio
from aiohttp import ClientSession
class Downloader:
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()
async def download(self):
async with ClientSession() as session:
url_lst = [
"https://ssr1.scrape.center/page/1",
"https://ssr1.scrape.center/page/2"
]
download_tasks = list()
for url in url_lst:
download_task = asyncio.create_task(self.get_async(session, url))
download_tasks.append(download_task)
for task in download_tasks:
await task
result = task.result()
extract_movie_name(result)
def async_run(self):
asyncio.run(self.download())
- 编写主函数
main
if __name__ == "__main__":
downloader = Downloader()
downloader.async_run()
此时,下载类能够正常运行。
# 运行结果
霸王别姬 - Farewell My Concubine
这个杀手不太冷 - Léon
肖申克的救赎 - The Shawshank Redemption
...
整合下载类
目前我们的下载类还是一个单独的功能,我们需要将下载方法整合进现有代码,采用多进程方法调用下载方法,并通过下载队列交换数据。
- 改造
Bridge类
增加下载队列相关功能,原有代码不变。
download_queue:下载队列。
put_download_queue、get_download_queue、download_queue_empty 的功能不言自明。
class Bridge:
def __init__(self):
manager = Manager()
self.download_queue = manager.Queue()
def put_download_queue(self, workshop):
self.download_queue.put_nowait(workshop)
def get_download_queue(self):
return self.download_queue.get_nowait()
def download_queue_empty(self):
return self.download_queue.empty()
- 改造
Workshop类
增加 url、need_download、html 三个属性
class Workshop:
def __init__(self, url, need_download):
self.url = url
self.need_download = need_download
self.html = None
self._next_process = None
- 改造
MyWorkshop类
依据 Workshop 类的改变修改初始化代码,用本节的 extract_movie_name 方法稍加改造代替上节的两段模拟代码。
class MyWorkshop(Workshop):
def __init__(self, url, need_download):
super().__init__(url, need_download)
self.set_start_process(self.extract_movie_name)
async def extract_movie_name(self):
soup = BeautifulSoup(self.html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
self.set_end()
- 改造
Downloader类
改造 async_run、__init__ 方法,使其可以接收信息传递类 Bridge 并保存。
增加 get_page 方法:接收 workshop,取出 url 交给 get_async 下载,下载好的 html 保存在 workshop 的 html 属性,之后置 workshop 的 need_download 属性为 False,返回 workshop 。
修改 download 方法:和 works 一样采用 bridge.work_end() 判断是否程序结束,从 download_queue 下载队列中取得 workshop,交给 get_page 方法处理,返回的 workshop 放入任务队列 work_queue 中进行下一步处理。
class Downloader:
def __init__(self):
self.bridge = None
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()
async def get_page(self, session, workshop):
workshop.html = await self.get_async(session, workshop.url)
workshop.need_download = False
return workshop
async def download(self):
while not self.bridge.work_end():
async with ClientSession() as session:
download_tasks = list()
while not self.bridge.download_queue_empty():
workshop = self.bridge.get_download_queue()
task = asyncio.create_task(self.get_page(session, workshop))
download_tasks.append(task)
for task in download_tasks:
await task
workshop = task.result()
self.bridge.put_work_queue(workshop)
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.download())
- 改造
Works类
修改 run_works 方法:从 work_queue 拿到 workshop 后,判断其是否需要下载,如果需要下载就推入下载队列 download_queue 让下载进程下载。
其余部分保持不变。
class Works:
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
if workshop.need_download:
self.bridge.put_download_queue(workshop)
continue
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
- 改造
App类
下载进程作为一个新进程调用。
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
self.download = Downloader()
def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge,))
p_download = Process(target=self.download.async_run,
args=(self.bridge,))
p_run_works.start()
p_download.start()
p_run_works.join()
p_download.join()
- 改造主函数
main
在主函数中生成 Workshop 的列表,交给 App 执行即可。
if __name__ == "__main__":
work_lst = list()
url_template = "https://ssr1.scrape.center/page/{}"
for i in range(1, 11):
url = url_template.format(str(i))
work_lst.append(
MyWorkshop(url=url, need_download=True)
)
app = App()
app.async_run(work_lst)
至此,程序已可正常执行。
# 运行结果
霸王别姬 - Farewell My Concubine
...
魂断蓝桥 - Waterloo Bridge
运行时间:2.26s
本节完整代码
import asyncio
import time
from functools import wraps
from multiprocessing import Process
from multiprocessing import Manager
from aiohttp import ClientSession
from bs4 import BeautifulSoup
def print_run_time(func):
# 记录运行程序运行时间的装饰器
@wraps(func) # 保持被装饰的函数名不变,否则多进程调用出错
def wrap(*args, **kwargs):
start = time.time()
f = func(*args, **kwargs)
end = time.time()
print("运行时间:{:.2f}s".format(end - start))
return f
return wrap
class Bridge:
def __init__(self):
manager = Manager()
self.work_queue = manager.Queue()
self.download_queue = manager.Queue()
self.config_dict = manager.dict()
self.init_config()
def init_config(self):
self.config_dict["running_work_cnt"] = 0
self.config_dict["work_start_flag"] = False
def init_works(self, workshop_lst):
for workshop in workshop_lst:
self.put_work_queue(workshop)
self.work_cnt_increase()
def flag_start(self):
self.config_dict["work_start_flag"] = True
def work_end(self):
return self.config_dict["work_start_flag"]\
and not self.config_dict["running_work_cnt"]
def work_cnt_increase(self):
self.config_dict["running_work_cnt"] += 1
def work_cnt_decrease(self):
self.config_dict["running_work_cnt"] -= 1
def put_work_queue(self, workshop):
self.work_queue.put_nowait(workshop)
def get_work_queue(self):
return self.work_queue.get_nowait()
def work_queue_empty(self):
return self.work_queue.empty()
def put_download_queue(self, workshop):
self.download_queue.put_nowait(workshop)
def get_download_queue(self):
return self.download_queue.get_nowait()
def download_queue_empty(self):
return self.download_queue.empty()
class Workshop:
def __init__(self, url, need_download):
self.url = url
self.need_download = need_download
self.html = None
self._next_process = None
def set_start_process(self, func):
self._next_process = func
def set_next_process(self, func):
self._next_process = func
def set_end(self):
self._next_process = "/EOF"
def is_end(self):
return self._next_process == "/EOF"
async def run_next_process(self):
workshop = await self._next_process()
if workshop:
return workshop
else:
return self
class Works:
def __init__(self):
self.bridge = None
def distribute_works(self, task):
workshop = task.result()
if not workshop.is_end():
self.bridge.put_work_queue(workshop)
else:
self.bridge.work_cnt_decrease()
async def run_works(self):
self.bridge.flag_start()
while not self.bridge.work_end():
task_lst = list()
while not self.bridge.work_queue_empty():
workshop = self.bridge.get_work_queue()
if workshop.need_download:
self.bridge.put_download_queue(workshop)
continue
task = asyncio.create_task(workshop.run_next_process())
task_lst.append(task)
for task in task_lst:
await task
self.distribute_works(task)
@print_run_time
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.run_works())
class Downloader:
def __init__(self):
self.bridge = None
async def get_async(self, session, url):
async with session.get(url=url) as resp:
return await resp.text()
async def get_page(self, session, workshop):
workshop.html = await self.get_async(session, workshop.url)
workshop.need_download = False
return workshop
async def download(self):
while not self.bridge.work_end():
async with ClientSession() as session:
download_tasks = list()
while not self.bridge.download_queue_empty():
workshop = self.bridge.get_download_queue()
task = asyncio.create_task(self.get_page(session, workshop))
download_tasks.append(task)
for task in download_tasks:
await task
workshop = task.result()
self.bridge.put_work_queue(workshop)
def async_run(self, bridge):
self.bridge = bridge
asyncio.run(self.download())
class App:
def __init__(self):
self.works = Works()
self.bridge = Bridge()
self.download = Downloader()
def async_run(self, workshop_lst):
self.bridge.init_works(workshop_lst)
p_run_works = Process(target=self.works.async_run,
args=(self.bridge, ))
p_download = Process(target=self.download.async_run,
args=(self.bridge,))
p_run_works.start()
p_download.start()
p_run_works.join()
p_download.join()
class MyWorkshop(Workshop):
def __init__(self, url, need_download):
super().__init__(url, need_download)
self.set_start_process(self.extract_movie_name)
async def extract_movie_name(self):
soup = BeautifulSoup(self.html, "html.parser")
name_tags = soup.find_all(class_="m-b-sm")
for name_tag in name_tags:
print(name_tag.string)
self.set_end()
if __name__ == "__main__":
work_lst = list()
url_template = "https://ssr1.scrape.center/page/{}"
for i in range(1, 11):
url = url_template.format(str(i))
work_lst.append(
MyWorkshop(url=url, need_download=True)
)
app = App()
app.async_run(work_lst)
本节总结
经过本节的改造,我们已经得到了一个简单的异步爬虫。针对一系列新网页,只需要继承 Workshop 类,实现自己的爬取流程代码即可。当然,目前它只能胜任最简单的工作,没有考虑错误处理、定制请求参数、代理、日志等一系列问题,这些需要在日后的使用中慢慢完善。