Hello,大家好。上周末比较忙,没时间更新文章。这周就写一点其他的吧。
其实也不算框架吧,只能算简单的封装了一些东西(大佬手下留情<( ̄3 ̄)> ),同时加了Redis
和mongoDB
来分别作为任务队列以及存储请求的过程(本来打算作为url
过滤,不过不清楚怎么写了,就先把能跑的东西放上来吧)。
设计
没有那么夸张,就是普通的流程图。
这个Request
是自己写的一个自定义类(取名太随意了O(∩_∩)O哈哈~)。主要就是先封装一个Request
对象,里面包含了如下内容:
url
headers
proxy
downloader
pipeline, etc.
新创建的Request
对象将通过初始化传递给Manger
并序列化为数据存放到相关ID
的Redis
队列中,然后正式启动爬虫的进程。一旦管理器启动,就会进入循坏迭代,不断从队列中取出数据并反序列化为Request
对象。
取出对象后,就判断它的类别,带有自带下载器则采用自带的下载器,进行请求。(通常这些类别也同时拥有回调函数,这样请求结果直接进入回调函数进行分析并生成新的Request
对象)。
以上为大致流程。
自定义Request类
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@File : request.py
@Time : 2017/10/21 0021 17:25
@Author : Empty Chan
@Contact : chen19941018@gmail.com
@Description: 自定义Reuqest类
"""
import json
import click
from hashlib import md5
import time
import datetime
from utils import ua
from downloader import HttpDownloader
class Request(object):
"""
请求类
"""
def __init__(self, url, name, title=None,
folder=None, headers=None, callback=None, pipeline=None,
downloader=None,
category=None, proxy=None):
"""
初始化
:param url: 请求链接
:param name: 请求名称,用来作为存放的文件夹名称以及mongo的集合名
:param title: 存放到mongo的信息
:param folder: 存放image或者video的子文件夹名称或者存放text的文件名
:param headers: 请求头
:param callback: 回调函数
:param pipeline: 处理管道
:param downloader: 下载器
:param category: 类别,定义在工具类中,作为mongo的集合名
:param proxy: 代理
"""
super(object, self).__init__()
self.name = name
self.category = category
self.url = url
headers_temp = {"User-Agent": ua.random}
if headers:
headers_temp.update(headers)
r = md5()
__id = '{url}+{headers}'.format(url=url, headers=headers_temp)
r.update(__id.encode('utf-8'))
self.id = r.hexdigest()
self.title = title
self.folder = folder
self.headers = headers_temp
self.pipeline = pipeline
if not downloader:
downloader = HttpDownloader
self.downloader = downloader
self.callback = callback
self.proxy = proxy
def __call__(self, *args, **kwargs):
"""
存放到mongo
:param args: 位置参数
:param kwargs: 命名参数
:return:
"""
return {"_id": self.id,
"name": self.name,
"url": self.url,
"title": self.title,
"folder": self.folder,
"category": self.category,
"date": datetime.datetime.utcnow(),
"timestamp": time.time() * 1000}
def to_dict(self):
"""
序列化
:return:
"""
return {"name": self.name,
"url": self.url,
"title": self.title,
"headers": self.headers,
"folder": self.folder,
"pipeline": self.pipeline,
"category": self.category,
"downloader": self.downloader,
"callback": self.callback,
"proxy": self.proxy}
@staticmethod
def from_dict(di):
"""
反序列化
:param di: 从redis取出的序列化的数据
:return:
"""
cb = di['callback']
# import dictionary
# callback = dictionary.TASK[cb]
name = di['name']
title = di['title'] if di['title'] else None
headers = di['headers'] if di['headers'] else None
folder = di['folder'] if di['folder'] else None
proxy = di['proxy'] if di['proxy'] else None
return Request(url=di['url'],
name=name,
title=title,
headers=headers,
folder=folder,
pipeline=di['pipeline'],
category=di['category'],
downloader=di['downloader'],
callback=cb,
proxy=proxy)
Manager类
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@File : manager.py
@Time : 2017/11/5 0005 11:33
@Author : Empty Chan
@Contact : chen19941018@gmail.com
@Description: 主要的管理类
"""
import json
import click
import time
from request import Request
import grequests
from log_util import Log
from redis_util import rdb
from retrying import retry
from db_store import mongo_map
from utils import TEXT, INDEX, IMAGE, VIDEO, NEXT, DETAIL
import pickle as cPickle
import time
def exception_handler(requests, exception):
"""
grequests的错误异常处理
:param requests: 请求链接
:param exception: 异常信息
:return: None
"""
click.echo(exception)
class Manager(object):
"""
管理器
"""
def __init__(self, start_request: Request):
"""
初始化
:param start_request: 初始化的自定义request
"""
super().__init__()
self.rdb = rdb
self.start_request = start_request
self.append_spider(start_request)
self.task_list = [] # 辅助并发请求
self.logger = Log(name='Manager')
self.req_count = 0
self.count = 0
def append_spider(self, req):
"""
添加到redis
:param req: 自定义request
:return: None
"""
temp = cPickle.dumps(req.to_dict(), protocol=-1)
self.rdb.rpush(self.start_request.id, temp)
@retry(stop_max_attempt_number=3, wait_random_min=0, wait_random_max=200)
def __request(self, spiders: list):
"""
并发批量请求,用于image和video
:param spiders: 自定义request集合
:return: 请求的数据集合
"""
url_list = []
self.logger.info('start batch request!')
for url in spiders:
if url.proxy:
url_list.append(grequests.get(url.url, headers=url.headers, proxies=url.proxy, timeout=10))
else:
url_list.append(grequests.get(url.url, headers=url.headers, timeout=10))
self.logger.info('all complete!')
return grequests.map(url_list, exception_handler=exception_handler)
def handle(self, spider: Request):
"""
redis中取出的request处理
:param spider: 自定义的request
:return: None
"""
gallery = mongo_map(spider.name)
self.req_count += 1
if self.req_count == 50:
time.sleep(3)
self.req_count = 0
if spider.category == IMAGE \
or spider.category == VIDEO:
retry_list = []
if not gallery.find_one({"_id": spider.id}):
gallery.insert_one(spider())
if not spider.pipeline.exist(spider.pipeline, spider):
self.task_list.append(spider)
if 20 >= len(self.task_list) >= 10 or self.count < 10: # 并发处理
res_list = self.__request(self.task_list)
for i, res in enumerate(res_list):
if not self.task_list[i].pipeline.store(self.task_list[i].pipeline, data=res, spider=self.task_list[i]):
retry_list.append(spider)
self.task_list.clear()
self.task_list.extend(retry_list)
retry_list.clear()
elif spider.category == INDEX \
or spider.category == NEXT \
or spider.category == DETAIL:
# start = time.time()
res = spider.downloader.request(spider.downloader, spider=spider)
# end = time.time()
# click.echo('request consume %s' % str(end - start))
spider.pipeline.store(spider.pipeline, data=res, spider=spider)
# start = time.time()
# click.echo('store consume %s' % str(start - end))
if spider.callback:
result = spider.callback(res, spider)
for sp in result:
if not gallery.find_one({"_id": sp.id}):
gallery.insert_one(sp())
self.append_spider(sp)
elif spider.category == TEXT:
res = spider.downloader.request(spider.downloader, spider=spider)
spider.pipeline.store(spider.pipeline, data=res, spider=spider)
def run(self):
"""
运行
:return: None
"""
self.count = self.rdb.llen(self.start_request.id)
while self.count:
spider = None
try:
click.echo('start spider.....')
start = time.time()
temp = self.rdb.lpop(self.start_request.id)
by = cPickle.loads(temp)
spider = Request.from_dict(by)
end = time.time()
click.echo('get spider consume %s' % str(end - start))
self.handle(spider)
click.echo('handle complete!')
except Exception as e:
# time.sleep(0.1)
# if spider:
# self.append_spider(spider)
raise e
self.count = self.rdb.llen(self.start_request.id)
Pipeline类
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@File : pipeline.py
@Time : 2017/10/21 0021 16:31
@Author : Empty Chan
@Contact : chen19941018@gmail.com
@Description: 文件处理管道
"""
import json
import click
import os
from requests import Response
from utils import TEXT, IMAGE, VIDEO
from request import Request
import abc
class BasePipeline(abc.ABC):
def __init__(self):
pass
@abc.abstractmethod
def store(self, data: Response, spider: Request) -> bool:
pass
@abc.abstractmethod
def exist(self, spider: Request) -> bool:
pass
class FilePipeline(BasePipeline):
"""
文件处理管道
"""
def __init__(self):
super().__init__()
def store(self, data: Response, spider: Request) -> bool:
"""
储存文件
:param data: requests返回的数据
:param spider: 请求的自定义request
:return: 存储是否成功
"""
if not data:
click.echo('data is None')
return False
main_folder = str(spider.name).lower()
if not os.path.exists('../{0}/{1}'.format(spider.category, main_folder)):
os.mkdir('../{0}/{1}'.format(spider.category, main_folder))
if spider.category == IMAGE:
query = '../{0}/{1}/{2}/{3}.jpg'.format(spider.category, main_folder, spider.folder, spider.id)
with open(query, mode='wb') as f:
f.write(data.content)
click.echo("save %s in %s" % (spider.category, query))
click.echo("save %s===>>>%s" % (spider.category, spider.url))
elif spider.category == TEXT:
query = '../{0}/{1}/{2}.txt'.format(spider.category, main_folder, spider.folder)
if spider.callback:
result = spider.callback(data, spider)
with open(query, mode='w', encoding='utf-8') as f:
f.writelines(result.get('title'))
f.writelines(result.get('content'))
click.echo("save %s in %s" % (spider.category, query))
click.echo("save %s===>>>%s" % (spider.category, spider.url))
return True
def exist(self, spider: Request) -> bool:
"""
判断文件是否存在
:param spider: 请求的自定义request
:return: 文件是否存在
"""
main_folder = str(spider.name).lower()
query = None
if spider.category == IMAGE:
query = '../{0}/{1}/{2}/{3}.jpg'.format(spider.category, main_folder, spider.folder, spider.id)
elif spider.category == TEXT:
query = '../{0}/{1}/{2}.txt'.format(spider.category, main_folder, spider.folder)
if not query:
return False
if os.path.exists(query):
return True
return False
class FolderPipeline(BasePipeline):
"""
文件夹处理
"""
def __init__(self):
super().__init__()
def store(self, data: Response, spider: Request) -> bool:
"""
文件夹处理
:param data: requests返回的数据
:param spider: 请求的自定义request
:return: 文件夹创建是否成功
"""
click.echo("***************")
click.echo(spider.id)
click.echo(spider.name)
click.echo(spider.url)
main_folder = str(spider.name).lower()
query = '.'
if spider.category == TEXT or spider.category == IMAGE:
query = '../{0}/{1}'.format(spider.category, main_folder)
if not os.path.exists(query):
os.mkdir(query)
if spider.category == IMAGE:
if not os.path.exists('%s/%s' % (query, spider.folder)):
os.mkdir('%s/%s' % (query, spider.folder))
click.echo(' create folder=>>> %s/%s' % (query, spider.folder))
click.echo("@@@@@@@@@@@@@@@")
return True
def exist(self, spider: Request) -> bool:
return False
class ConsolePipeline(BasePipeline):
"""
控制台输出
"""
def __init__(self):
super().__init__()
def store(self, data: Response, spider: Request) -> bool:
click.echo("***************")
click.echo(spider.id)
click.echo(spider.name)
click.echo(spider.url)
click.echo("@@@@@@@@@@@@@@@")
return True
def exist(self, spider: Request) -> bool:
return False
还有一个Callback
没有介绍,作为下回和具体的实例相结合说明一下,而且其中的一些东西写得不算好,希望后期能够完善一下。放上Github地址。
一同庆祝单身节!!程序猿怎么可能有女朋友,不可能的不可能的,滑稽。
下周见!!