官方文档:http://docs.pyspider.org/
中文网址:http://www.pyspider.cn/book/pyspider/
最新版本: https://github.com/binux/pyspider/releases
PySpider: 一个国人编写的强大的网络爬虫系统并带有强大的WebUI。采用Python语言编写,分布式架构,支持多种数据库后端,强大的WebUI支持脚本编辑器,任务监视器,项目管理器以及结果查看器
pyspider是作者之前做的一个爬虫架构的开源化实现。主要的功能需求是:
- 抓取、更新调度多站点的特定的页面
- 需要对页面进行结构化信息提取
- 灵活可扩展,稳定可监控 而这也是绝大多数python爬虫的需求 —— 定向抓取,结构化化解析。但是面对结构迥异的各种网站,单一的抓取模式并不一定能满足,灵活的抓取控制是必须的。为了达到这个目的,单纯的配置文件往往不够灵活,于是,通过脚本去控制抓取是我最后的选择。 而去重调度,队列,抓取,异常处理,监控等功能作为框架,提供给抓取脚本,并保证灵活性。最后加上web的编辑调试环境,以及web任务监控,即成为了这套框架。
pyspider的设计基础是:以python脚本驱动的抓取环模型爬虫
*通过python脚本进行结构化信息的提取,follow链接调度抓取控制,实现最大的灵活性
*通过web化的脚本编写、调试环境。web展现调度状态
- 抓取环模型成熟稳定,模块间相互独立,通过消息队列连接,从单进程到多机分布式灵活拓展
安装:
添加依赖
- sudo apt-get install python python-dev python-distribute python-pip libcurl4-openssl-dev libxml2-dev libxslt1-dev python-lxml libssl-dev zlib1g-dev
- sudo apt-get install phantomjs
- pip3 install pyspider
启动:
- pyspider all
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Created on 2019-01-09 14:03:21
# Project: lianjia
from pyspider.libs.base_handler import *
import json
import pymongo
import pymysql
class Handler(BaseHandler):
#pyspider爬虫的主类,在这里进行爬取解析和存储数据
#crawl_config:在这个参数中可以做全局的设置(UA,Header,proxy)
crawl_config = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'
}
#创建mongodb数据库连接
mongo_cli = pymongo.MongoClient('127.0.0.1',27017)
#获取要操作的数据库
db = mongo_cli['lianjia']
#获取数据库下的集合
col = db['lianjiacol']
#创建mysql连接
#mysql_cli = pymysql.Connect('127.0.0.1','root','nihao123','lianjia',3306,charset='utf8')
#cursor = mysql_cli.cursor()
#定时每隔一天进行重复请求,重新执行on_start方法
@every(minutes=24 * 60)
def on_start(self):
#根据crawl发起请求
self.crawl('https://bj.lianjia.com/ershoufang/', callback=self.index_page)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
#response.doc 是一个pyquery对象
#response.etree:返回的是一个lxml对象,可以使用xpath语法
#提取每一个房源的详情地址url
hourse_infos = response.doc('ul.sellListContent li.clear.LOGCLICKDATA')
for hourse in hourse_infos.items():
detail_url = hourse('div.title a').attr.href
self.crawl(detail_url,callback=self.detail_page)
#提取下一页发起请求
#data = response.doc('div.page-box.house-lst-page-box').attr.page-data
data = response.etree.xpath('//div[@class="page-box house-lst-page-box"]/@page-data')[0]
print('data啊',data)
json_data = json.loads(data)
#print('nextdata',json_data)
cur_page = int(json_data['curPage'])
total_page = int(json_data['totalPage'])
if cur_page<total_page:
#发起下一页请求
next_page = cur_page+1
next_url = 'https://bj.lianjia.com/ershoufang/pg%s/'%str(next_page)
self.crawl(next_url,callback = self.index_page)
#next_url = response.doc()
#for each in response.doc('a[href^="http"]').items():
#self.crawl(each.attr.href, callback=self.detail_page)
@config(priority=2)
def detail_page(self, response):
print('二手房详情获取成功')
#获取详情的数据
info = {}
#标题(获取标签的属性和文本)
#info['title'] = response.doc('h1.main').attr.title
info['title'] = response.doc('h1.main').text()
#描述
info['sub_title'] = response.doc('div.sub').text()
#关注人数
info['attenNum'] = response.doc('#favCount').text()
#预约看房
info['yuyueNum'] = response.doc('#cartCount').text()
#总价
info['price'] = response.doc('div.price.total').text()
#每平
info['unitPrice'] = response.doc('span.unitPriceValue').text()
#规格
info['room'] = response.doc('div.room div.mainInfo').text()
#朝向
info['type'] = response.doc('div.type div.mainInfo').text()
#面积
info['area'] = response.doc('div.area div.mainInfo').text()
#小区名称
info['aroundinfo'] = response.doc('div.communityName span.label').text()
#print(info)
return info
def on_result(self,result):
print('获取到了结果',result)
if result:
try:
self.col.insert(result)
print('数据存储成功')
#sql = """
#insert into lianjiadb(%s) values(%s)
#"""
#%(','.join(result.keys()),
#','.join(['%s']*len(result)))
#data = list(result.value())
except Exception as err:
print('数据插入失败',err)
#return {
#"url": response.url,
#"title": response.doc('title').text(),
#}
Overview(组件)
Scheduler(调度)
调度程序从处理器的newtask_queue接收任务。确定任务是新任务还是需要重新爬网。根据优先级对任务进行排序,并将其提供给具有流量控制的提取器(令牌桶算法)。处理定期任务,丢失任务和失败的任务,然后重试
以上所有都可以通过self.crawl
API设置
Fetcher(提取程序)
Fetcher负责获取网页,然后将结果发送给处理器。对于灵活的,fetcher支持数据URI和由JavaScript呈现的页面(通过phantomjs)。可以通过API通过脚本控制获取方法,标头,cookie,代理,etag等。
Phantomjs Fetcher
Phantomjs Fetcher就像代理一样工作。它连接到一般的Fetcher,在启用JavaScript的情况下获取和渲染页面,将一般HTML输出回Fetcher
Processor(处理器)
理器负责运行用户编写的脚本来解析和提取信息。您的脚本在无限制的环境中运行。虽然我们有各种工具(如PyQuery)供您提取信息和链接,但您可以使用任何想要处理响应的内容。您可以参考脚本环境和API参考以获取有关脚本的更多信息
处理器将捕获异常和日志,发送状态(任务跟踪)和新任务scheduler,将结果发送到Result Worker。
WebUI
WebUI是一切的Web前端。它包含:
- 脚本编辑器,调试器
- 专案经理
- 任务监视器
- 结果查看器,导出器
也许webui是pyspider最吸引人的部分。有了这个功能强大的UI,您可以像pyspider一样逐步调试脚本。启动或停止项目。查找哪个项目出错并且哪个请求失败,然后使用调试器再次尝试。
Data flow(数据流)
pyspider中的数据流如上图所示:
- on_start当您按下RunWebUI上的按钮时,每个脚本都有一个名为callback的回调。将新任务on_start作为项目条目提交给Scheduler。
- 调度程序将此on_start任务调度为数据URI作为Fetcher的常规任务。
- Fetcher发出请求并对其做出响应(对于数据URI,这是一个虚假的请求和响应,但与其他正常任务没有区别),然后提供给处理器。
4.处理器调用该on_start方法并生成一些新的URL以进行爬网。处理器向Scheduler发送一条消息,表示此任务已完成,新任务通过消息队列发送到Scheduler(on_start在大多数情况下,这里没有结果。如果有结果,处理器将它们发送到result_queue)。
5.调度程序接收新任务,在数据库中查找,确定任务是新的还是需要重新爬网,如果是,则将它们放入任务队列。按顺序发送任务。
6.这个过程重复(从第3步开始)并且在WWW死亡之前不会停止;-)。调度程序将检查定期任务以抓取最新数据
self.crawl参数
url
要抓取的网址或网址列表
callback
the method to parse the response. _ default _: __ call __
def on_start(self):
self.crawl('http://scrapy.org/', callback=self.index_page)
可选参数
age
任务的有效期。在此期间,该页面将被视为未修改。默认值:-1(从不重新抓取)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
...
回调解析的每个页面index_page都将被视为在10天内未被更改。如果您在上次抓取后的10天内提交任务,则会将其丢弃。
priority(优先级)
任务安排的优先级越高越好。默认值:0
def index_page(self):
self.crawl('http://www.example.org/page2.html', callback=self.index_page)
self.crawl('http://www.example.org/233.html', callback=self.detail_page,
priority=1)
该页面233.html
之前将被抓取page2.html
。使用此参数可以执行BFS并减少队列中的任务数(这可能会花费更多的内存资源)
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Created on 2019-01-09 21:23:14
# Project: douyutv
from pyspider.libs.base_handler import *
import pymongo
class Handler(BaseHandler):
crawl_config = {
'Usea-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'
}
#创建mongodb数据库连接
mongo_cli = pymongo.MongoClient('127.0.0.1',27017)
#获取要操作的数据库
db = mongo_cli['douyutv']
#获取数据库下的集合
col = db['douyutvcol']
@every(minutes=24 * 60)
def on_start(self):
self.crawl('https://www.douyu.com/directory', callback=self.index_page)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
type_infos = response.doc('ul#live-list-contentbox li.unit ')
#type_infos = response.etree.xpath('//ul[@id="live-list-contentbox"]/li[@class="unit "]')
#print(type(type_infos))
#print(type_infos)
for type_info in type_infos.items():
#print('aaaa')
type_url = type_info('a').attr.href
#type_url = type_info('./a/text()')
#print(type_url)
self.crawl(type_url,callback = self.detail_page)
@config(priority=2)
def detail_page(self, response):
print('开始获取分类详情')
detail_data = {}
#首先获取房间信息
detail_info = response.doc('ul#live-list-contentbox li')
#房间标题
#要遵循css语法,比如id用#,class用. 另外注意标签里的空格用.表示
detail_data['title'] = detail_info('h3.ellipsis').text()
#主播类型
detail_data['type'] = detail_info('span.tag.ellipsis').text()
#主播昵称
detail_data['name'] = detail_info('span.dy-name.ellipsis.fl').text()
#主播热度
detail_data['hot'] = detail_info('span.dy-num.fr').text()
print(detail_data)
return detail_data
#page_data = response.doc('div#J-pager a.shark-pager-item current')
try:
a = response.etree.xpath('//div[@J-pager]/a[@class="shark-pager-next"]')
a.click()
self.crawl(a,callback = self.detail_page)
except Exception as err:
print('没有下一页了')
def on_result(self,result):
print('获取到了结果',result)
if result:
try:
self.col.insert(result)
print('数据存储成功')
except Exception as err:
print('数据插入失败',err)