从一个代理池讲起?
搞爬虫的一般都有自己的代理池,代理池的结构一般分为抓取模块,存储模块,检测模块,api模块。
抓取模块本身也是一个爬虫,它会爬取个大免费代理网站的页面,解析。最后把数据交给存储模块。
假设现在我们要爬取一个代理网站http://www.website1.com/free 的前10页。对应第n页的URL应该是这样的
http://www.website1.com.free/n/ 。这是一个简单的爬虫,我们可以轻松的实现:
import time
# 计算时间的装饰器
def costtime(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()
func(*args, **kwargs)
print(time.perf_counter() - start)
return wrapper
# 获取页面
def getPage(url):
print(f"crawling: {url}")
time.sleep(2)
return url
# 解析页面
def parsePage(page):
print(f'parsing {page} done!')
time.sleep(0.2)
return page
# 单个url的调度函数
def schedule(url):
page = getPage(url)
res = parsePage(page)
@costtime
def main():
start_url = 'http://www.website1.com/{}/'
for i in range(5):
schedule(start_url.format(i))
if __name__ == '__main__':
main()
这里我们使用time.sleep(2)模仿等待响应的过程过程。整个程序运行下来的时间等于:
总时间 = 当个页面时间 * 页面数
整个程序跑下来花了2.2s * 6 = 13.2s
可以优化么?
让我们思考以下几个问题:
- 不同页之间的爬虫线程有数据关联么?爬取第一页的数据会影响第二页的数据么?
- 不同页之间的爬虫线程之间有优先级么?比如一定要爬取了第一页才能爬取第二页?
很明显,答案是否定的。
所以我们尝试引入asyncio异步库打乱线程的优先级:
import time
import asyncio
# 计算时间的装饰器
def costtime(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()
func(*args, **kwargs)
print(time.perf_counter() - start)
return wrapper
# 获取页面
async def getPage(url):
print(f"crawling: {url}")
await asyncio.sleep(2)
return url
# 解析页面
def parsePage(page):
print(f'parsing {page} done!')
time.sleep(0.2)
return page
# 单个url的调度函数
async def crawlSingleUrl(url):
page = await getPage(url)
res = parsePage(page)
async def schedule(start_url, page):
tasks = []
for i in range(page):
tasks.append(crawlSingleUrl(start_url.format(i)))
await asyncio.gather(*tasks)
@costtime
def main():
start_url = 'http://www.website1.com/{}/'
page = 5
asyncio.run(schedule(start_url, page))
if __name__ == '__main__':
main()
这里我们使用asyncio进行并发处理:
- 协程化: async语句会将指定函数封装成coroutine协程对象 。协程对象的特性是可以将处理机自由让出。
- 挂起协程: await语句会将协程挂起,等到合适的时机重运行进程。
- 收集协程: asyncio.gather方法会将协程对象自动封装成task,押入运行loop中,一个个的执行。
- 启动协程集: asyncio.run运行loop中的coroutine对象,等同于之前的语法:
loop = asyncio.get_loop_event()
loop.run_until_complete(asyncio.wait(task1,task2....))
现在回到了我们之前的程序, 我们把会阻塞的地方---getPage函数定义为协程,在请求代码前面加上了await,让其让出处理机。
那么处理机就运行其他协程的getPage代码了。
这样我们就实现了在某页等待response时,去发起下一页的requests或者页面解析。
整个程序跑下来只花了3.1s左右。
还可以再优化么?
之前我们说过代理池的抓取模块是从各大免费代理网站抓取的。那么每个网站的抓取除了页面解析规则不同之外,其他的都一样。
让我们进一步思考,如果把不同网站比做上面同一网站的不同页的话,是否可以得出如下结论:
- 不同网站的数据获取没有先后关系
所以我们也给每个网站的获取加上异步并发:
import time
import asyncio
# 计算时间的装饰器
def costtime(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()
func(*args, **kwargs)
print(time.perf_counter() - start)
return wrapper
# 获取页面
async def getPage(url):
print(f"crawling: {url}")
await asyncio.sleep(2)
return url
# 解析页面
def parsePage(page):
print(f'parsing {page} done!')
time.sleep(0.2)
return page
# 单个url的调度函数
async def crawlSingleUrl(url):
page = await getPage(url)
res = parsePage(page)
# 爬取单个网站的所有页面
async def crawlWebsie(start_url, page):
tasks = []
for i in range(page):
tasks.append(crawlSingleUrl(start_url.format(i)))
await asyncio.gather(*tasks)
async def schedule():
start_url1 = 'http://www.website1.com/{}/'
start_url2 = 'http://www.website2.com/{}'
start_url3 = 'http://www.website3.com/{}'
page = 5
await asyncio.gather(crawlWebsie(start_url1, page), crawlWebsie(start_url2, page), crawlWebsie(start_url3, page))
@costtime
def main():
asyncio.run(schedule())
if __name__ == '__main__':
main()
这里我们加多了一个crawlWebsite函数,用来爬取整个网站的所有页面。然后在schedule调度方法里把不同的crawlWebsite收集到(gather)loop里面。
在main函数中运行loop。
这样我们就实现了不同网页间的并发,假设A站点响应速度比较缓慢的话,程序会把A挂起,运行B,C站点的协程。
整个程序跑下来花费了5s左右,很出乎意料。
这样我们的就实现了两个层级的并发,一个是同一网页间的不同页面的并发。另一个是不同网站之间的并发。
最后
异步的本质是打破线程之间的优先级,即让线程共同去竞争GIL,这点有点类似于多线程模块threading。但是不同的是,协程之间的切换消耗会比线程之间的切换消耗小。
如果把抓取模块中的最小单位规定为每一页的话。那么抛开网站而言,所有代理网站的请求集合都是页的集合,而我们所做的只是消除这些不相关页运行的先后顺序。
一句话:众页平等
基于上面的异步编程我写了一个代理池,可用于爬取各大网站,效率还是挺不错的。
基于异步的代理池