太复杂。本人不会
import requests
from lxml import etree
from urllib.parse import urlparse
from time import sleep
from threading import Thread
from queue import Queue
import sys
from redis import Redis
# 用IDLE来记录线程的空闲
IDLE = 0
# 用WORKING来记录线程的工作状态
WORKING = 1
rds = Redis("127.0.0.1",6379,db=10)
# 定义一个装饰器对象
class retry(object):
def __init__(self,max_tries=3,wait=3,exceptions=(Exception,)):
self.max_tries = max_tries
self.wait = wait
self.exceptions = exceptions
def __call__(self, f):
def wrapper(*args,**kwargs):
for i in range(self.max_tries+1):
try:
result = f(*args,**kwargs)
except self.exceptions as e:
print("waitting",e)
sleep(self.wait) # 如果有异常休眠一会再请求
print("retry %s "%(i+1))
continue
else:
return result
return wrapper
# 定义一个集合,用于存放访问过的网址
REQUESTED_URL = set()
# 下载页面
@retry(3,3)
def fetch(url):
print(f'Fetching: {url}')
res = requests.get(url)
# 请求过以后,把网址添加到集合中
REQUESTED_URL.add(url)
# 请求成功,把页面内容返回出去
if res.status_code == 200:
return res.text
return None
# 解析数据
def paese(html):
# 剔除一些无效的url对应的页面
if html in [None,'',b'']:
return []
# 通过xpath语法寻找a标签
doc = etree.HTML(html)
if doc is None:
return []
# 取当前页面中url
urls = doc.xpath("//a/@href")
# print(urls)
# 对页面上获取到的链接进行清洗
# 定义一个列表,用于存放清洗完成url
url_list = []
for ori_url in urls:
parse_url = urlparse(ori_url) # ParseResult(scheme='', netloc='', path='/a/249902322_102150', params='', query='_f=m-index_business_news_9', fragment='')
# print(parse_url)
# 过滤域名
domain = parse_url.netloc.strip() or "m.sohu.com"
if domain == "m.sohu.com":
# 过滤协议
scheme = parse_url.scheme.strip() or "http"
path = parse_url.path.strip()
query = f'?{parse_url.query}'.strip() if parse_url.query else ''
# 拼接url
url = f'{scheme}://{domain}{path}{query}'
# 把拼接好的url存储
url_list.append(url)
return url_list
# 定义一个函数调用下载与解析
def get_and_parse(url,url_queue):
html = fetch(url)
# paese(html)
# print(html)
for url in paese(html):
url_queue.put(url)
# #定义一个函数,用于处理线程
# def process(url_list):
# queue = Queue()
# workers = []
# for url in url_list:
# t = Thread(target=get_and_parse,args=(url,queue))
# t.setDaemon(True)
# workers.append(t)
# t.start()
# for t in workers:
# t.join()
# return list(queue.queue)
# 创建一个多线程爬虫类
class Spider(Thread):
# def __init__(self,todo_list):
def __init__(self):
super().__init__()
# self.todo_list = todo_list
self.stat = IDLE
def is_idle(self):
return self.stat == IDLE
def run(self):
while True:
url = rds.blpop("TODO_LIST")[1]
# url = self.todo_list.get()
# 开始抓取
self.stat = WORKING
html = fetch(url)
# url_list = set(paese(html))
# url_list -= REQUESTED_URL
url_list = set([url.encode('utf-8') for url in paese(html)])
url_list -= rds.smembers("REQUEST_URL") # 去重
# # 将新得到的url添加到循环里面
# for url in url_list:
# self.todo_list.put(url)
if url_list:
rds.lpush("TODO_LIST",*url_list)
# 把工作状态设置为空闲
self.stat = IDLE
def main(max_threads):
# 添加任务
print("Start")
# todo_list = Queue() # 待抓取的url
# todo_list.put("http://m.sohu.com")
print(rds.lpush("TODO_LIST","http://m.sohu.com/"))
# 创建n个线程,并启动
# spiders = [Spider(todo_list) for i in range(max_threads)]
spiders = [Spider() for i in range(max_threads)]
for spd in spiders:
spd.start()
# 检测所有的线程是否全部完成工作
while True:
# 改成redis键的判断
# if todo_list.empty() and [spd.is_idle() for spd in spiders]:
if rds.llen("TODO_LIST")==0 and [spd.is_idle() for spd in spiders]:
# 当前待抓取的列表为空,所有的线程也全部为空闲,退出程序
print("所有的工作都完成了")
sys.exit(0)
else:
print("REQUESRED %d" % rds.scard("REQUEST_URL"))
sleep(1)
if __name__ == '__main__':
if len(sys.argv) >= 2:
max_threads = int(sys.argv[1])
main(max_threads)