大家好,我是剑南!
为了做一篇教程,我竟把一个小说网站给搞崩溃了,着实给我下了一跳,每次都是报出503的错误代码,意思是服务器不可访问,就是因为我用协程写了个爬虫程序。
注意:本文仅仅提供学习使用,不可破坏网络,否则后果自负!!
因为服务器接受不了这么大的压力,导致资源暂时无法访问,所以当我停止爬虫程序的时候,该小说网站逐渐恢复正常。
如果你有认真阅读我的博文的话,你会发现对多线程、队列、多进程的文章我分别只总结了一篇,但是关于协程的文章,今天是我第五次写了,说实话,协程涉及到的坑太多,也不容易,需要一次次总结自己所遇到的问题以及优化之前的代码。
关于多线程、多进程、队列等知识,我现在用到的比较少,因此总结的只有一篇,望读者见谅。
协程
协程的本质是单线程,它只是利用了程序中的延时时间,在不断的切换所执行的代码块。协程切换任务效率高,利用线程延时等待的时间,因此在实际处理时优先考虑使用协程。
初识异步http框架httpx
对协程不了解的小伙伴可以考虑翻出我之前写的文章,做简单的了解。对于requests库相信大家都不会陌生,但是requests中实现的http请求是同步请求,但是其实基于http请求的I/O阻塞特性,非常适合用协程来实现异步http请求。
httpx继承了所有requests的特性并且支持异步http请求的开源库。
安装httpx
pip install httpx
实践
接下来我将使用httpx同步与异步的方式对批量的http请求进行耗时比较,来一起看看结果吧。
import httpx
import threading
import time
def send_requests(url, sign):
status_code = httpx.get(url).status_code
print(f'send_requests:{threading.current_thread()}:{sign}: {status_code}')
start = time.time()
url = 'http://www.httpbin.org/get'
[send_requests(url, sign=i) for i in range(200)]
end = time.time()
print('运行时间:', int(end - start))
代码比较简单,可以看出send_requests中实现了同步访问了目标地址200次。
部分运行结果,如下所示:
send_requests:<_MainThread(MainThread, started 9552)>:191: 200
send_requests:<_MainThread(MainThread, started 9552)>:192: 200
send_requests:<_MainThread(MainThread, started 9552)>:193: 200
send_requests:<_MainThread(MainThread, started 9552)>:194: 200
send_requests:<_MainThread(MainThread, started 9552)>:195: 200
send_requests:<_MainThread(MainThread, started 9552)>:196: 200
send_requests:<_MainThread(MainThread, started 9552)>:197: 200
send_requests:<_MainThread(MainThread, started 9552)>:198: 200
send_requests:<_MainThread(MainThread, started 9552)>:199: 200
运行时间: 102
从运行结果上可以看到,主线程是按照顺序执行的,因为这是同步请求。
程序共耗时102秒。
它来了,它来了,下面就让我们试试异步的http请求,看看它会给我们带来什么样的惊喜。
import asyncio
import httpx
import threading
import time
client = httpx.AsyncClient()
async def async_main(url, sign):
response = await client.get(url)
status_code = response.status_code
print(f'{threading.current_thread()}:{sign}:{status_code}')
def main():
loop = asyncio.get_event_loop()
tasks = [async_main(url='https://www.baidu.com', sign=i) for i in range(200)]
async_start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
async_end = time.time()
loop.close()
print('运行时间:', async_end-async_start)
if __name__ == '__main__':
main()
部分运行结果,如下所示:
<_MainThread(MainThread, started 13132)>:113:200
<_MainThread(MainThread, started 13132)>:51:200
<_MainThread(MainThread, started 13132)>:176:200
<_MainThread(MainThread, started 13132)>:174:200
<_MainThread(MainThread, started 13132)>:114:200
<_MainThread(MainThread, started 13132)>:49:200
<_MainThread(MainThread, started 13132)>:52:200
运行时间: 1.4899322986602783
看到这个运行时间有没有让你吓一大跳,居然在1秒多的时间里,向百度访问了200次。速度快到飞起。
限制并发数
前面我讲过并发数太大会导致服务器崩溃,因此我们要考虑限制并发数,那么当asyncio与httpx结合的时候应该怎么样限制并发数呢?
使用Semaphore
asyncio其实自带了一个限制协程数量的类,叫做Semaphore。我们只需要初始化它,传入最大允许协程数量,然后就可以通过上下文管理器。具体代码如下所示:
import asyncio
import httpx
import time
async def send_requests(delay, sem):
print(f'请求一个延时为{delay}秒的接口')
await asyncio.sleep(delay)
async with sem:
# 执行并发的代码
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get('http://www.httpbin.org/get')
print(resp)
async def main():
start = time.time()
delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8]
task_list = []
sem = asyncio.Semaphore(3)
for delay in delay_list:
task = asyncio.create_task(send_requests(delay, sem))
task_list.append(task)
await asyncio.gather(*task_list)
end = time.time()
print('一共耗时:', end-start)
asyncio.run(main())
部分运行结果,如下所示:
<Response [200 OK]>
<Response [200 OK]>
<Response [200 OK]>
<Response [200 OK]>
<Response [200 OK]>
<Response [200 OK]>
<Response [200 OK]>
<Response [200 OK]>
一共耗时: 9.540421485900879
但是,如果想要在1分钟内只有3个协程,又该如何处理呢?
只需要将代码改成如下图所示就行:
async def send_requests(delay, sem):
print(f'请求一个延时为{delay}秒的接口')
await asyncio.sleep(delay)
async with sem:
# 执行并发的代码
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get('http://www.httpbin.org/get')
print(resp)
await asyncio.sleep(60)
总结
如果大家要限制协程的并发数,那么最简单的方式就是使用Semaphore。但是需要注意的是,只能在启动协程之前初始化,然后传给协程,确保并发协程拿到的是同一个Semaphore对象。
当然,在程序里面也有可能出现不同部分,每个部分的并发数可能是不同的,因此需要初始化多个Semaphore对象。
实战-笔趣阁
网页分析
首先在小说的主页,可以发现所有小说的章节链接都在dd标签下的a标签内的href属性中。
首先第一步要做的就是拿到所有的章节链接。
接下来要做的就是,进入每一个章节,获取其中的内容。
从上图可以看到,文章内容在<div id="content">标签中,在图片中可以发现大量的换行,因此在写代码时需要做进一步去除空格的处理。
获取网页源码
async def get_home_page(url, sem):
async with sem:
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(url)
resp.encoding = 'utf-8'
html = resp.text
return html
获取所有的章节链接
async def parse_home_page(sem):
async with sem:
url = 'https://www.biqugeu.net/13_13883/'
html = etree.HTML(await get_home_page(url, sem))
content_urls = ['https://www.biqugeu.net/' + url for url in html.xpath('//dd/a/@href')]
return content_urls
在这里需要注意,我多做了一个操作那就是拼接url,因为我们抓取到的url并不是完整的因此需要做简单的拼接。
保存数据
async def data_save(url, sem):
async with sem:
html = etree.HTML(await get_home_page(url, sem))
title = html.xpath('//h1/text()')[0]
contents = html.xpath('//div[@id="content"]/text()')
print(f'正在下载{title}')
for content in contents:
text = ''.join(content.split())
with open(f'./金枝2/{title}.txt', 'a', encoding='utf-8') as f:
f.write(text)
f.write('\n')
将上面获取到的url传入data_save()函数中,对每一个url进行解析,获取文本内容,再进行保存。
创建协程任务
async def main():
sem = asyncio.Semaphore(20)
urls = await parse_home_page(sem)
tasks_list = []
for url in urls:
task = asyncio.create_task(data_save(url, sem))
tasks_list.append(task)
await asyncio.gather(*tasks_list)
结果展示
不到一分钟的时间,便将所有的小说都抓取下来了,试想一下,如果是普通爬虫要多久?
起码737秒!!
最后
这次会是我最后一次写协程码?肯定不是啦,还有一篇关于异步网络请求库Aiohttp,等我遇到之后再分享给大家。
本次分享到这里就结束了,如果你看到了这里,希望你可以给我点个【赞】与【再看】,如果可以,请你分享给更多的人一起学习。
文章的每一个字都是我用心写出来的,你的【点赞】会让我知道,你就是那个和我一起努力的人。