scrapy流程图镇楼,本文基于Scrapy=1.6.0,写这篇文章的目的是让自己温习一下scrapy。
下图是我比较简陋的爬虫代码,它只是为了让scrapy运行起来。嗯,有必要说一下由于好久没用scrapy了meiju这个spider是我在网上找的,很不幸原本parse方法里的代码没法用了,所以重写了下。启动方式我没有用scrapy命令行启动,这个主要是考虑这样可以跳过前面那些不太让人关心的命令行实现细节。
class MeijuSpider(scrapy.Spider):
name = 'meiju'
allowed_domains = ['meijutt.com']
start_urls = ['https://www.meijutt.com/']
def parse(self, response):
movies = response.xpath('//div[@class="l week-hot layout-box"]//ul')
for each_movie in movies:
item = each_movie.xpath('//a/text()').extract()[0]
print item
yield item
if __name__ == '__main__':
process = CrawlerProcess({
'USER_AGENT': 'MoziMozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
})
process.crawl(MeijuSpider)
process.start()
可以看到首先运行的process.crawl(MeijuSpider),我们看下crawl方法做了什么
def crawl(self, crawler_or_spidercls, *args, **kwargs):
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)
crawl方法看起来并不复杂,下面分别看这两行代码,首先是create_crawler,由于这里面还有一些层级我就直接写最里面的那一层,其实就是基于MeituSpider实例了一个Crawler对象,下面是Crawler类
class Crawler(object):
def __init__(self, spidercls, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.spidercls = spidercls
self.settings = settings.copy()
self.spidercls.update_settings(self.settings)
d = dict(overridden_settings(self.settings))
logger.info("Overridden settings: %(settings)r", {'settings': d})
self.signals = SignalManager(self)
self.stats = load_object(self.settings['STATS_CLASS'])(self)
handler = LogCounterHandler(self, level=self.settings.get('LOG_LEVEL'))
logging.root.addHandler(handler)
if get_scrapy_root_handler() is not None:
# scrapy root handler already installed: update it with new settings
install_scrapy_root_handler(self.settings)
# lambda is assigned to Crawler attribute because this way it is not
# garbage collected after leaving __init__ scope
self.__remove_handler = lambda: logging.root.removeHandler(handler)
self.signals.connect(self.__remove_handler, signals.engine_stopped)
lf_cls = load_object(self.settings['LOG_FORMATTER'])
self.logformatter = lf_cls.from_crawler(self)
self.extensions = ExtensionManager.from_crawler(self)
self.settings.freeze()
self.crawling = False
self.spider = None
self.engine = None
包含了spidercls(MeituSpider)、setttings、spider跟engine,engine是scrapy的核心组件,这里先点一下scrapy的其他组件其实都是包含在engine中的,而engine又是crawler的属性也就是这scrapy流程图中的全流程都在这一个对象当中。注意,目前为止MeituSpider是没有实例化的,在scrapy中命名中带有已cls结尾的变量都未实例化
def _crawl(self, crawler, *args, **kwargs):
self.crawlers.add(crawler) # type set
d = crawler.crawl(*args, **kwargs)
self._active.add(d) # type set
def _done(result):
self.crawlers.discard(crawler)
self._active.discard(d)
self.bootstrap_failed |= not getattr(crawler, 'spider', None)
return result
return d.addBoth(_done)
这是_crawl方法,self.crawlers跟self._active都是set类型,这里算是添加了标识,其中self.crawlers的实现还是值得一说的
crawlers = property(
lambda self: self._crawlers,
doc="Set of :class:`crawlers <scrapy.crawler.Crawler>` started by "
":meth:`crawl` and managed by this class."
)
-----------------------------------------------------------------------------------------------------
@property
def crawlers(self)
return self._crawlers
虚线上面是crawlers在scrapy的实现,它等价于虚线下的代码,并且可以看做是对下面代码的解释
@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
assert not self.crawling, "Crawling already taking place"
self.crawling = True # crawler启动标识
try:
self.spider = self._create_spider(*args, **kwargs) # 实例化spidercls(MeituSpider)
self.engine = self._create_engine() # 创建引擎
start_requests = iter(self.spider.start_requests()) # 将start_urls封装成Request对象并做成生成器
yield self.engine.open_spider(self.spider, start_requests) # 激活爬虫,内部会激活engine的各个组件
yield defer.maybeDeferred(self.engine.start) # 启动引擎,内部会开启telnet服务,可远程查看scrapy的运行状态,比如当前任务队列的大小等
except Exception:
# In Python 2 reraising an exception after yield discards
# the original traceback (see https://bugs.python.org/issue7563),
# so sys.exc_info() workaround is used.
# This workaround also works in Python 3, but it is not needed,
# and it is slower, so in Python 3 we use native `raise`.
if six.PY2:
exc_info = sys.exc_info()
self.crawling = False
if self.engine is not None:
yield self.engine.close()
if six.PY2:
six.reraise(*exc_info)
raise
我们继续看crawler.crawl,首先crawler的启动标识设置为True,然后从crawler中取到spidercls并实例化, 实例化engine,将start_urls封装成Request并做成生成器,engine开启爬虫,engine启动,由于这个方法搞事有点多,接下来我们从_create_engine()开始看起
class ExecutionEngine(object):
def __init__(self, crawler, spider_closed_callback):
self.crawler = crawler
self.settings = crawler.settings
self.signals = crawler.signals
self.logformatter = crawler.logformatter
self.slot = None
self.spider = None
self.running = False
self.paused = False
self.scheduler_cls = load_object(self.settings['SCHEDULER'])
downloader_cls = load_object(self.settings['DOWNLOADER'])
self.downloader = downloader_cls(crawler)
self.scraper = Scraper(crawler)
self._spider_closed_callback = spider_closed_callback
----------------------------------------------------------------------------------------------------------
@defer.inlineCallbacks
def stop(self):
if self.crawling:
self.crawling = False
yield defer.maybeDeferred(self.engine.stop)
以上就是engine的初始化代码,spider_closed_callback是虚线下面的代码,当然它属于Crawler类,也就是将crawler的启动标识置为关闭, 同时也将engine的启动标识置为关闭,在这之后将各个组件都关闭掉。我们继续看engine的初始化代码,加载了scheduler类,实例化downloader、scraper,注意cls结尾的变量都是没有实例化的,也就是说那张经典的scrapy流程图中的其他组件都是engine的属性。当然,你可能会问了scheduler跟downloader在流程图中都有体现,这个scraper是个啥,别急我们马上就会揭晓。我们再看downloader与scraper的实例化代码
class Downloader(object):
def __init__(self, crawler):
self.settings = crawler.settings
self.signals = crawler.signals
self.slots = {} # {hostname1: slot1, ...},每个域名都有其对应的slot
self.active = set() # 正在downloader中处理请求的标识集合
self.handlers = DownloadHandlers(crawler) # 存放了各种schme(http/https/ftp等)的下载器
self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS') # 总并发数,默认16
self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN') # 每个域名的并发,默认8
self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP') # 每个ip的并发数,默认0,不做限制
self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY') # 默认开启对delay的随机偏移处理,比如你设置的delay是1,那么每次是在0.5-1.5之间随机取值
self.middleware = DownloaderMiddlewareManager.from_crawler(crawler) # 实例化中间件,我们自己的中间件就是在这被实例化的
self._slot_gc_loop = task.LoopingCall(self._slot_gc)
self._slot_gc_loop.start(60)
donwloader组件的实例化,我们看到从settings中读取了一些让人熟悉的参数。关于中间件的实例化我说一下,也就是DownloaderMiddlewareManager.from_crawler(crawler)
@classmethod
def from_settings(cls, settings, crawler=None):
mwlist = cls._get_mwlist_from_settings(settings) # 加载所有中间件类
middlewares = []
enabled = []
for clspath in mwlist:
try:
mwcls = load_object(clspath)
mw = create_instance(mwcls, settings, crawler)
middlewares.append(mw)
enabled.append(clspath)
except NotConfigured as e:
if e.args:
clsname = clspath.split('.')[-1]
logger.warning("Disabled %(clsname)s: %(eargs)s",
{'clsname': clsname, 'eargs': e.args[0]},
extra={'crawler': crawler})
logger.info("Enabled %(componentname)ss:\n%(enabledlist)s",
{'componentname': cls.component_name,
'enabledlist': pprint.pformat(enabled)},
extra={'crawler': crawler})
return cls(*middlewares)
首先是_get_mwlist_from_settings加载所有中间件类,包含两组,一组是scrapy自带的,另外一组是我们自己实现的(settings里的DOWNLOADER_MIDDLEWARES),该方法不止加载了中间件类,还将其做了一个排序,具体实现是加载完两组中间件类后做成基于tuple的列表[(中间件类, 顺序号), ...],根据序号从小到大排序然后只提取出中间件类做成一个新列表,并将这个列表return出来。现在我们再来看from_settings,往下走是将这些中间件类实例化放入middlewares列表。最后return cls(*middlewares),这个意思是实例化cls表示的类(此时指DownloaderMiddlewareManager的父类MiddlewareManager),那么下面再看初始化方法
def __init__(self, *middlewares):
self.middlewares = middlewares
self.methods = defaultdict(deque)
for mw in middlewares:
self._add_middleware(mw)
做了一个默认值为双向队列的字典,下载的循环看起来应该就是在装载装中间件了
def _add_middleware(self, mw):
if hasattr(mw, 'process_request'):
self.methods['process_request'].append(mw.process_request)
if hasattr(mw, 'process_response'):
self.methods['process_response'].appendleft(mw.process_response)
if hasattr(mw, 'process_exception'):
self.methods['process_exception'].appendleft(mw.process_exception)
我们在写自己的中间件时具体的处理逻辑就是在process_request、process_response、process_exception方法当中,这边将这些方法做了一个分类存储到了methods这个字典中。双向队列的append是在右边(队尾)添加,appendleft是在左边(队首)添加,遍历方向是从队首-队尾,这样就实现了downlaoder中间件中数值越小其process_request越先执行,数值越大其process_response越后执行,这就是scrapy中间件中执行的原理
class Scraper(object):
def __init__(self, crawler):
self.slot = None
self.spidermw = SpiderMiddlewareManager.from_crawler(crawler) # spider中间件实例化
itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
self.itemproc = itemproc_cls.from_crawler(crawler) # itemPipelines中间件实例化
self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
self.crawler = crawler
self.signals = crawler.signals
self.logformatter = crawler.logformatter
再来看scraper,很明显spider中间件、itemPipelines中间件都是在scraper中实例化的,现在scrapy流程图中的所有组件我们都已经找到了。目前为止我们已经把创建引擎(engine)看完了
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
assert self.has_capacity(), "No free spider slot when opening %r" % \
spider.name
logger.info("Spider opened", extra={'spider': spider})
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
slot.nextcall.schedule()
slot.heartbeat.start(5)
现在我们看engine.open_spider,这里面搞的事情更多,首先封装了一个twisted运行对象,实例化scheduler,将start_requests走一波spiders的process_start_requests中间件(如果有的话,默认没有),创建slot,以及各种open
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
dupefilter = create_instance(dupefilter_cls, settings, crawler)
pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG'))
return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)
我们先看scheduler的实例化,看起来不复杂,实例化了过滤器,加载了pqclass优先级队列类,dqclass硬盘队列类(LIFO后进先出),mqclass内存队列类(LIFO后进先出),这里需要说一下dqclass、mqclass的实例化是作为参数传给了pqclass,这个下面会讲。接下来我们看slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
class Slot(object):
def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
self.closing = False
self.inprogress = set() # requests in progress
self.start_requests = iter(start_requests)
self.close_if_idle = close_if_idle
self.nextcall = nextcall
self.scheduler = scheduler
self.heartbeat = task.LoopingCall(nextcall.schedule)
这里我们可以看到,scheduler其实是绑定在engine.slot中的,这里面nextcall绑定了engine._next_request方法,这个方法是请求调度的开始我们后面会讲,至于heartbeat这个字面意思是心跳,不过这里的心跳不同于很多框架中报告自己存活的那个心跳,scrapy这个心跳的功能是每隔一段时间(5s,open_spider方法的最后一句设置的)运行一次engine._next_request。下面看scheduler.open()
def open(self, spider):
self.spider = spider
self.mqs = self.pqclass(self._newmq) # 将mqclass封装成优先级队列
self.dqs = self._dq() if self.dqdir else None # 默认None不开启,如果开启将dqclass封装成优先级队列
return self.df.open()
def _newmq(self, priority):
return self.mqclass()
self.mqs实例化,将mqclass封装成了优先级队列(pqclass没有队列实体只是实现了优先级功能)。mqclass本身是LIFO(后进先出)队列,所以self.mqs即有优先级特性、也有LIFO特性,同时也正是LIFO特性实现了scrapy的默认深度优先,那么如果需要让scrapy变为广度优先需要怎么做呢?当然是将mqs改为先进先出队列了。硬盘队列默认是不使用的为None,如果是开启硬盘队列的话同mqs。最后是开启了去重
现在得回到前面open_spider方法最后两句slot.nextcall.schedule()、slot.heartbeat.start(5)是将nextcall丢到twisted的循环里,start(5)之前说过了设置心跳时间为5,即每5秒执行一次eng._next_request方法,至此process.crawl(MeijuSpider)执行完毕。
下面是process.start(),这个方法没有太多好说的,就是默认设置了10个线程用于主流程之外的处理,比如信号暂停、停止信息的捕获什么的,这些是通过多线程完成的,当然这个多线程是twisted内部实现的。比较重要的一点就是这个方法执行了reactor.run开启了twisted的主循环。现在scrapy真正要开始动起来了。首先进入的是之前nextcall绑定的_next_request方法
def _next_request(self, spider):
slot = self.slot
if not slot:
return
if self.paused:
return
while not self._needs_backout(spider): # 是否需要等待(条件 引擎知否运行、引擎的slot是否关闭、downloader、scraper中处理的请求or数据是否已达上限)
if not self._next_request_from_scheduler(spider): # start_requests为None时才会真正的进入,不然进去也会被return出来
break
if slot.start_requests and not self._needs_backout(spider):
try:
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
self.crawl(request, spider)
if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)
----------------------------------------------------------------------------------------------------------
def crawl(self, request, spider):
assert spider in self.open_spiders, \
"Spider %r not opened when crawling: %s" % (spider.name, request)
self.schedule(request, spider)
self.slot.nextcall.schedule()
第一次进来该方法的时候肯定是start_requests第一次被迭代的时候,这时候进入while代码块时会被return出来,因为这个scheduler的队列里面没有request,我们就直接看它会执行的if代码块,这里面运行了self.crawl(request, spider),这个方法就是虚线下面的代码,调用了engine的schedule方法这个方法就是将request入scheduler的队列(默认内存队列mqs),然后又调用了一遍nextcall(engine._next_reqeust)。再次看_next_reqeust方法,这次scheduler的队列已经有请求了所以真正的会执行while内部的逻辑,当然执行完后如果start_requests还有值(自己实现的Spider里start_urls = [url1, url2, ...的情况])依然会将剩余的url2-...入队,这次我们看while内部的self._next_request_from_scheduler(spider)
def _next_request_from_scheduler(self, spider):
slot = self.slot
request = slot.scheduler.next_request() # 出队,优先从mqs(内存队列)里出,如果为空则尝试从dqs(硬盘队列)出队
if not request: # 如果没有请求会被return出去,第一次start_requests就是在这被return的
return
d = self._download(request, spider) # 下载
d.addBoth(self._handle_downloader_output, request, spider) # 添加response处理,具体就是走spider中间件、itemPipelines中间件
d.addErrback(lambda f: logger.info('Error while handling downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.remove_request(request)) # 从正在处理的标识集合(set)中删除
d.addErrback(lambda f: logger.info('Error while removing request from slot',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
d.addBoth(lambda _: slot.nextcall.schedule()) # 继续调用nextcall处理reqeust
d.addErrback(lambda f: logger.info('Error while scheduling new request',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
这个方法首先出队,然后将进入了一个下载方法_download,后续又给绑定了一些回调,ok我们先看下_download
def _download(self, request, spider):
slot = self.slot
slot.add_request(request) # 添加到正在处理的标识集合(engine.inprogress, type set)中
def _on_success(response):
assert isinstance(response, (Response, Request))
if isinstance(response, Response):
response.request = request # tie request to response received
logkws = self.logformatter.crawled(request, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
self.signals.send_catch_log(signal=signals.response_received, \
response=response, request=request, spider=spider)
return response
def _on_complete(_):
slot.nextcall.schedule()
return _
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
首先将reqeust添加到了engine.inprogress中,这个属性是个set集合用来标识这个请求正在处理,注意这里标识的是正在走全流程(就是流程图中那一套)而不是某一个组件的流程比如downloader。继续往下看,self.downloader.fetch(request, spider),看明显这是开始调用downloader组件了,后面几句绑定的回调是写日志跟继续调度队列中的请求,好我们深入到downloader组件
def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response
self.active.add(request) # 添加到正在下载的标识集合
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate) # 绑定回调,下载完毕后将请求从正在下载标识中移除
这个方法是在更改downloader的状态,将请求添加到正在下载的标识集合中,以及下载完后将其从标识中移除,其实我们可以发现到现在为止好像每个组件都有一个正在处理的标识,这个的作用是让人能很清晰看出scrapy的运行情况每个组件的负载,以此分析出性能瓶颈做优化等等,当然scrapy自带的telnet查看运行情况的功能其实就是len(标识)来做的。话题扯回来我们继续看self.middleware.download(self._enqueue_request, request, spider)
def download(self, download_func, request, spider):
@defer.inlineCallbacks
def process_request(request):
for method in self.methods['process_request']:
response = yield method(request=request, spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_request must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, response.__class__.__name__)
if response:
defer.returnValue(response)
defer.returnValue((yield download_func(request=request,spider=spider)))
@defer.inlineCallbacks
def process_response(response):
assert response is not None, 'Received None in process_response'
if isinstance(response, Request):
defer.returnValue(response)
for method in self.methods['process_response']:
response = yield method(request=request, response=response,
spider=spider)
assert isinstance(response, (Response, Request)), \
'Middleware %s.process_response must return Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if isinstance(response, Request):
defer.returnValue(response)
defer.returnValue(response)
@defer.inlineCallbacks
def process_exception(_failure):
exception = _failure.value
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception,
spider=spider)
assert response is None or isinstance(response, (Response, Request)), \
'Middleware %s.process_exception must return None, Response or Request, got %s' % \
(six.get_method_self(method).__class__.__name__, type(response))
if response:
defer.returnValue(response)
defer.returnValue(_failure)
deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred
download这个方法主要是走一波downloader的中间件,先走process_request然后,走完process_request会调用一波传进来的downloader._enqueue_request方法,根据下载的成功与否分别走process_response、process_exception。
def _enqueue_request(self, request, spider):
key, slot = self._get_slot(request, spider)
request.meta['download_slot'] = key
def _deactivate(response):
slot.active.remove(request)
return response
slot.active.add(request)
self.signals.send_catch_log(signal=signals.request_reached_downloader,
request=request,
spider=spider)
deferred = defer.Deferred().addBoth(_deactivate)
slot.queue.append((request, deferred)) # 添加到待处理队列中,双向队列
self._process_queue(spider, slot)
return deferred
开始先获取key(hostname)跟其对应的slot,其实目前为止downloader.slots一直为空,真正的实例化也就是在self._get_slot(request, spider)这个方法里,如果获取不到的话该方法会实例化一个slot,从request中获取到url的hostname做key,然后downloader.slots[key] = slot,最终将key与slot返回,就是这种每个hostname对应一个slot的设计很方便的实现了对域名的并发的限制。继续_enqueue_request方法给请求添加了slot标识,将请求添加到正在处理标识集合中然后将请求入队,最终调用downloader._process_queue方法
def _process_queue(self, spider, slot):
if slot.latercall and slot.latercall.active():
return
# Delay queue processing if a download_delay is configured
now = time()
delay = slot.download_delay()
if delay:
penalty = delay - now + slot.lastseen
if penalty > 0:
slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
return
# Process enqueued requests if there are free slots to transfer for this slot
while slot.queue and slot.free_transfer_slots() > 0:
slot.lastseen = now
request, deferred = slot.queue.popleft() # 出队
dfd = self._download(slot, request, spider) # 下载
dfd.chainDeferred(deferred)
# prevent burst if inter-request delays were configured
if delay:
self._process_queue(spider, slot)
break
有设置delay的话就delay一下,注意这个delay的值是给定参数的0.5-1.5倍范围内随机取值,后面如果队列里有请求并且正在下载中的请求没有达到上限的话就会从任务队列里拿到请求,丢入downloader._download
def _download(self, slot, request, spider):
dfd = mustbe_deferred(self.handlers.download_request, request, spider)
def _downloaded(response):
self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider)
return response
dfd.addCallback(_downloaded)
slot.transferring.add(request) # 添加正在下载标识
def finish_transferring(_):
slot.transferring.remove(request)
self._process_queue(spider, slot)
return _
return dfd.addBoth(finish_transferring)
方法的开头就调到self.handlers.download_request方法,这个方法算是scrapy的最后一级的下载方法,self.handlers.download_request获取了与请求的scheme对应的下载器进行了下载,再往下递进几个方法就是twisted的下载实现了,对于下载我们就深入到这。回到downloader._download继续往下走,添加了正在下载的标识,最后下载完成后将其从正在下载标识中移除,继续对队列里的请求进行下载。这里的标识有点多,你可能会有点乱,不要紧我们理一下
downloader.active(set) request进入downloader组件的标识
downloader.slot.active request进入到slot的标识
downloader.slot.transferring request正在下载的标识
downloader.slot.queue 等待下载的request队列
ok现在下载走完了,下载完成后会经过downloader的response中间件处理前面已经说过了,现在我们来看经过downloader中间件处理完后response的流向,也就是之前在downloader._next_request_from_scheduler中绑定的_handle_downloader_output
def _handle_downloader_output(self, response, request, spider):
assert isinstance(response, (Request, Response, Failure)), response
# downloader middleware can return requests (for example, redirects)
if isinstance(response, Request): # 如果返回的是Request则重新进行抓取
self.crawl(response, spider)
return
# response is a Response or Failure
d = self.scraper.enqueue_scrape(response, request, spider) # 进入scraper处理
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
ok,我们看到在这个方法中scrapy的注释说的挺明白的,response可能是个Request对象比如301 302这种情况,这种会调用downloader.crawl方法继续抓取,那我们重点看下将response交给scraper处理的代码self.scraper.enqueue_scrape(response, request, spider)
def enqueue_scrape(self, response, request, spider):
slot = self.slot
dfd = slot.add_response_request(response, request) # 入队
def finish_scraping(_):
slot.finish_response(response, request)
self._check_if_closing(spider, slot)
self._scrape_next(spider, slot)
return _
dfd.addBoth(finish_scraping)
dfd.addErrback(
lambda f: logger.error('Scraper bug processing %(request)s',
{'request': request},
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
self._scrape_next(spider, slot)
return dfd
首先将response入队,绑定了几个回调,下面调用scraper._scrape_next
def _scrape_next(self, spider, slot):
while slot.queue:
response, request, deferred = slot.next_response_request_deferred() # 出队,获取response
self._scrape(response, request, spider).chainDeferred(deferred) # 转移,继续处理
ok,出队后将其交给了下一个方法处理
def _scrape(self, response, request, spider):
"""Handle the downloaded response or failure through the spider
callback/errback"""
assert isinstance(response, (Response, Failure))
dfd = self._scrape2(response, request, spider) # returns spiders processed output
dfd.addErrback(self.handle_spider_error, request, response, spider)
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd
将传进来的参数进一步交给scraper._scrape2,然后绑定了几个回调,其中绑定的self.handle_spider_output处于流程的尾部我们最后会再分析,现在还是先看下scraper._scrape2
def _scrape2(self, request_result, request, spider):
"""Handle the different cases of request's result been a Response or a
Failure"""
if not isinstance(request_result, Failure):
return self.spidermw.scrape_response(
self.call_spider, request_result, request, spider)
else:
# FIXME: don't ignore errors in spider middleware
dfd = self.call_spider(request_result, request, spider)
return dfd.addErrback(
self._log_download_errors, request_result, request, spider)
下载成功走if,失败走else,我们看到了无论成功还是失败都会调用scraper.call_spider,我们就只看成功吧。
先看if代码块,如果下载失败,那么就让spider中间件去处理,然后最终调用reqeust.errback,因为如果下载成功的话也会有一波类似的处理,我们这就不再展开关于失败细节的讨论
def scrape_response(self, scrape_func, response, request, spider):
fname = lambda f:'%s.%s' % (
six.get_method_self(f).__class__.__name__,
six.get_method_function(f).__name__)
def process_spider_input(response):
for method in self.methods['process_spider_input']:
try:
result = method(response=response, spider=spider)
assert result is None, \
'Middleware %s must returns None or ' \
'raise an exception, got %s ' \
% (fname(method), type(result))
except:
return scrape_func(Failure(), request, spider)
return scrape_func(response, request, spider)
def process_spider_exception(_failure):
exception = _failure.value
for method in self.methods['process_spider_exception']:
result = method(response=response, exception=exception, spider=spider)
assert result is None or _isiterable(result), \
'Middleware %s must returns None, or an iterable object, got %s ' % \
(fname(method), type(result))
if result is not None:
return result
return _failure
def process_spider_output(result):
for method in self.methods['process_spider_output']:
result = method(response=response, result=result, spider=spider)
assert _isiterable(result), \
'Middleware %s must returns an iterable object, got %s ' % \
(fname(method), type(result))
return result
dfd = mustbe_deferred(process_spider_input, response)
dfd.addErrback(process_spider_exception)
dfd.addCallback(process_spider_output)
return dfd
看起来是走了一波中间件,并且在走完process_spider_input这部门中间件后调用了传入的scraper.call_spider,提下process_spider_output中间件吧,这部门主要是做了为response添加抓取深度信息以及添加referer等,往下看scraper.call_spider
def call_spider(self, result, request, spider):
result.request = request
dfd = defer_result(result)
dfd.addCallbacks(request.callback or spider.parse, request.errback)
return dfd.addCallback(iterate_spider_output)
看下方法内部的第三行,如果request绑定了callback就将其添加回调,没有的话就回调我们自己在spider(MeijuSpider)中实现的解析方法,另外添加了错误回调方法,当然request.callback、request.errback默认都是None,现在我们终于看了我们自己实现的parse方法是在scraper中被调用的。spider中间件走完了下面该看itemPipeline了,也就是我们前面提过的scraper.handle_spider_output回调
def handle_spider_output(self, result, request, response, spider):
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items,
self._process_spidermw_output, request, response, spider)
return dfd
这里的result是前面回调方法执行完吐出来的(request.callback、parse、request.errback),下面我们再看scraper._process_spidermw_output
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)):
self.slot.itemproc_size += 1
dfd = self.itemproc.process_item(output, spider) # 走itemPipeLine中间件
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
typename = type(output).__name__
logger.error('Spider must return Request, BaseItem, dict or None, '
'got %(typename)r in %(request)s',
{'request': request, 'typename': typename},
extra={'spider': spider})
output也就是前面的result如果是Request类型那么继续进行抓取相关的调度,如果是数据模型、字典那么会走最后的ItemPipeLine中间件。
大致走了一遍scrapy,现在我们其实可以看出来scrapy是一个多线程异步框架,多线程体现在了与主流程无关的地方(信号监听之类的),流程处理其实还是基于twisted的异步做的,CONCURRENT_REQUESTS这个参数作用的不是线程,可以看做是协程一样的东西,调大点吧兄弟。