Scrapy源码剖析

scrapy流程图.png

scrapy流程图镇楼,本文基于Scrapy=1.6.0,写这篇文章的目的是让自己温习一下scrapy。
下图是我比较简陋的爬虫代码,它只是为了让scrapy运行起来。嗯,有必要说一下由于好久没用scrapy了meiju这个spider是我在网上找的,很不幸原本parse方法里的代码没法用了,所以重写了下。启动方式我没有用scrapy命令行启动,这个主要是考虑这样可以跳过前面那些不太让人关心的命令行实现细节。

class MeijuSpider(scrapy.Spider):
    name = 'meiju'
    allowed_domains = ['meijutt.com']
    start_urls = ['https://www.meijutt.com/']

    def parse(self, response):
        movies = response.xpath('//div[@class="l week-hot layout-box"]//ul')
        for each_movie in movies:
            item = each_movie.xpath('//a/text()').extract()[0]
            print item
            yield item


if __name__ == '__main__':
    process = CrawlerProcess({
        'USER_AGENT': 'MoziMozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
    })
    process.crawl(MeijuSpider)
    process.start()

可以看到首先运行的process.crawl(MeijuSpider),我们看下crawl方法做了什么

    def crawl(self, crawler_or_spidercls, *args, **kwargs):
        crawler = self.create_crawler(crawler_or_spidercls)
        return self._crawl(crawler, *args, **kwargs)

crawl方法看起来并不复杂,下面分别看这两行代码,首先是create_crawler,由于这里面还有一些层级我就直接写最里面的那一层,其实就是基于MeituSpider实例了一个Crawler对象,下面是Crawler类

class Crawler(object):

    def __init__(self, spidercls, settings=None):
        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)

        self.spidercls = spidercls
        self.settings = settings.copy()
        self.spidercls.update_settings(self.settings)

        d = dict(overridden_settings(self.settings))
        logger.info("Overridden settings: %(settings)r", {'settings': d})

        self.signals = SignalManager(self)
        self.stats = load_object(self.settings['STATS_CLASS'])(self)

        handler = LogCounterHandler(self, level=self.settings.get('LOG_LEVEL'))
        logging.root.addHandler(handler)
        if get_scrapy_root_handler() is not None:
            # scrapy root handler already installed: update it with new settings
            install_scrapy_root_handler(self.settings)
        # lambda is assigned to Crawler attribute because this way it is not
        # garbage collected after leaving __init__ scope
        self.__remove_handler = lambda: logging.root.removeHandler(handler)
        self.signals.connect(self.__remove_handler, signals.engine_stopped)

        lf_cls = load_object(self.settings['LOG_FORMATTER'])
        self.logformatter = lf_cls.from_crawler(self)
        self.extensions = ExtensionManager.from_crawler(self)

        self.settings.freeze()
        self.crawling = False
        self.spider = None
        self.engine = None

包含了spidercls(MeituSpider)、setttings、spider跟engine,engine是scrapy的核心组件,这里先点一下scrapy的其他组件其实都是包含在engine中的,而engine又是crawler的属性也就是这scrapy流程图中的全流程都在这一个对象当中。注意,目前为止MeituSpider是没有实例化的,在scrapy中命名中带有已cls结尾的变量都未实例化

    def _crawl(self, crawler, *args, **kwargs):
        self.crawlers.add(crawler)  # type set
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)  # type set

        def _done(result):
            self.crawlers.discard(crawler)
            self._active.discard(d)
            self.bootstrap_failed |= not getattr(crawler, 'spider', None)
            return result

        return d.addBoth(_done)

这是_crawl方法,self.crawlers跟self._active都是set类型,这里算是添加了标识,其中self.crawlers的实现还是值得一说的

crawlers = property(
        lambda self: self._crawlers,
        doc="Set of :class:`crawlers <scrapy.crawler.Crawler>` started by "
            ":meth:`crawl` and managed by this class."
    )
-----------------------------------------------------------------------------------------------------
@property
def crawlers(self)
    return self._crawlers

虚线上面是crawlers在scrapy的实现,它等价于虚线下的代码,并且可以看做是对下面代码的解释

@defer.inlineCallbacks
    def crawl(self, *args, **kwargs):
        assert not self.crawling, "Crawling already taking place"
        self.crawling = True   # crawler启动标识

        try:
            self.spider = self._create_spider(*args, **kwargs)  # 实例化spidercls(MeituSpider)
            self.engine = self._create_engine() # 创建引擎
            start_requests = iter(self.spider.start_requests()) # 将start_urls封装成Request对象并做成生成器
            yield self.engine.open_spider(self.spider, start_requests) # 激活爬虫,内部会激活engine的各个组件
            yield defer.maybeDeferred(self.engine.start) # 启动引擎,内部会开启telnet服务,可远程查看scrapy的运行状态,比如当前任务队列的大小等
        except Exception:
            # In Python 2 reraising an exception after yield discards
            # the original traceback (see https://bugs.python.org/issue7563),
            # so sys.exc_info() workaround is used.
            # This workaround also works in Python 3, but it is not needed,
            # and it is slower, so in Python 3 we use native `raise`.
            if six.PY2:
                exc_info = sys.exc_info()

            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()

            if six.PY2:
                six.reraise(*exc_info)
            raise

我们继续看crawler.crawl,首先crawler的启动标识设置为True,然后从crawler中取到spidercls并实例化, 实例化engine,将start_urls封装成Request并做成生成器,engine开启爬虫,engine启动,由于这个方法搞事有点多,接下来我们从_create_engine()开始看起

class ExecutionEngine(object):

    def __init__(self, crawler, spider_closed_callback):
        self.crawler = crawler
        self.settings = crawler.settings
        self.signals = crawler.signals
        self.logformatter = crawler.logformatter
        self.slot = None
        self.spider = None
        self.running = False
        self.paused = False
        self.scheduler_cls = load_object(self.settings['SCHEDULER'])
        downloader_cls = load_object(self.settings['DOWNLOADER'])
        self.downloader = downloader_cls(crawler)
        self.scraper = Scraper(crawler)
        self._spider_closed_callback = spider_closed_callback
----------------------------------------------------------------------------------------------------------
@defer.inlineCallbacks
    def stop(self):
        if self.crawling:
            self.crawling = False
            yield defer.maybeDeferred(self.engine.stop)

以上就是engine的初始化代码,spider_closed_callback是虚线下面的代码,当然它属于Crawler类,也就是将crawler的启动标识置为关闭, 同时也将engine的启动标识置为关闭,在这之后将各个组件都关闭掉。我们继续看engine的初始化代码,加载了scheduler类,实例化downloader、scraper,注意cls结尾的变量都是没有实例化的,也就是说那张经典的scrapy流程图中的其他组件都是engine的属性。当然,你可能会问了scheduler跟downloader在流程图中都有体现,这个scraper是个啥,别急我们马上就会揭晓。我们再看downloader与scraper的实例化代码

class Downloader(object):

    def __init__(self, crawler):
        self.settings = crawler.settings
        self.signals = crawler.signals
        self.slots = {} # {hostname1: slot1, ...},每个域名都有其对应的slot
        self.active = set()  # 正在downloader中处理请求的标识集合
        self.handlers = DownloadHandlers(crawler)  # 存放了各种schme(http/https/ftp等)的下载器
        self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS') # 总并发数,默认16
        self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN') # 每个域名的并发,默认8
        self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP') # 每个ip的并发数,默认0,不做限制
        self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY') # 默认开启对delay的随机偏移处理,比如你设置的delay是1,那么每次是在0.5-1.5之间随机取值
        self.middleware = DownloaderMiddlewareManager.from_crawler(crawler)  # 实例化中间件,我们自己的中间件就是在这被实例化的
        self._slot_gc_loop = task.LoopingCall(self._slot_gc)
        self._slot_gc_loop.start(60)

donwloader组件的实例化,我们看到从settings中读取了一些让人熟悉的参数。关于中间件的实例化我说一下,也就是DownloaderMiddlewareManager.from_crawler(crawler)

@classmethod
    def from_settings(cls, settings, crawler=None):
        mwlist = cls._get_mwlist_from_settings(settings) # 加载所有中间件类
        middlewares = []
        enabled = []
        for clspath in mwlist:
            try:
                mwcls = load_object(clspath)
                mw = create_instance(mwcls, settings, crawler)
                middlewares.append(mw)
                enabled.append(clspath)
            except NotConfigured as e:
                if e.args:
                    clsname = clspath.split('.')[-1]
                    logger.warning("Disabled %(clsname)s: %(eargs)s",
                                   {'clsname': clsname, 'eargs': e.args[0]},
                                   extra={'crawler': crawler})

        logger.info("Enabled %(componentname)ss:\n%(enabledlist)s",
                    {'componentname': cls.component_name,
                     'enabledlist': pprint.pformat(enabled)},
                    extra={'crawler': crawler})
        return cls(*middlewares)

首先是_get_mwlist_from_settings加载所有中间件类,包含两组,一组是scrapy自带的,另外一组是我们自己实现的(settings里的DOWNLOADER_MIDDLEWARES),该方法不止加载了中间件类,还将其做了一个排序,具体实现是加载完两组中间件类后做成基于tuple的列表[(中间件类, 顺序号), ...],根据序号从小到大排序然后只提取出中间件类做成一个新列表,并将这个列表return出来。现在我们再来看from_settings,往下走是将这些中间件类实例化放入middlewares列表。最后return cls(*middlewares),这个意思是实例化cls表示的类(此时指DownloaderMiddlewareManager的父类MiddlewareManager),那么下面再看初始化方法

    def __init__(self, *middlewares):
        self.middlewares = middlewares
        self.methods = defaultdict(deque)
        for mw in middlewares:
            self._add_middleware(mw)

做了一个默认值为双向队列的字典,下载的循环看起来应该就是在装载装中间件了

    def _add_middleware(self, mw):
        if hasattr(mw, 'process_request'):
            self.methods['process_request'].append(mw.process_request)
        if hasattr(mw, 'process_response'):
            self.methods['process_response'].appendleft(mw.process_response)
        if hasattr(mw, 'process_exception'):
            self.methods['process_exception'].appendleft(mw.process_exception)

我们在写自己的中间件时具体的处理逻辑就是在process_request、process_response、process_exception方法当中,这边将这些方法做了一个分类存储到了methods这个字典中。双向队列的append是在右边(队尾)添加,appendleft是在左边(队首)添加,遍历方向是从队首-队尾,这样就实现了downlaoder中间件中数值越小其process_request越先执行,数值越大其process_response越后执行,这就是scrapy中间件中执行的原理

class Scraper(object):

    def __init__(self, crawler):
        self.slot = None
        self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)  # spider中间件实例化
        itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
        self.itemproc = itemproc_cls.from_crawler(crawler)  # itemPipelines中间件实例化
        self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
        self.crawler = crawler
        self.signals = crawler.signals
        self.logformatter = crawler.logformatter

再来看scraper,很明显spider中间件、itemPipelines中间件都是在scraper中实例化的,现在scrapy流程图中的所有组件我们都已经找到了。目前为止我们已经把创建引擎(engine)看完了

@defer.inlineCallbacks
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
        assert self.has_capacity(), "No free spider slot when opening %r" % \
            spider.name
        logger.info("Spider opened", extra={'spider': spider})
        nextcall = CallLaterOnce(self._next_request, spider)
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.slot = slot
        self.spider = spider
        yield scheduler.open(spider)
        yield self.scraper.open_spider(spider)
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        slot.nextcall.schedule()
        slot.heartbeat.start(5)

现在我们看engine.open_spider,这里面搞的事情更多,首先封装了一个twisted运行对象,实例化scheduler,将start_requests走一波spiders的process_start_requests中间件(如果有的话,默认没有),创建slot,以及各种open

@classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
        dupefilter = create_instance(dupefilter_cls, settings, crawler)
        pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
        dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
        mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
        logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG'))
        return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
                   stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)

我们先看scheduler的实例化,看起来不复杂,实例化了过滤器,加载了pqclass优先级队列类,dqclass硬盘队列类(LIFO后进先出),mqclass内存队列类(LIFO后进先出),这里需要说一下dqclass、mqclass的实例化是作为参数传给了pqclass,这个下面会讲。接下来我们看slot = Slot(start_requests, close_if_idle, nextcall, scheduler)

class Slot(object):

    def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
        self.closing = False
        self.inprogress = set() # requests in progress
        self.start_requests = iter(start_requests)
        self.close_if_idle = close_if_idle
        self.nextcall = nextcall
        self.scheduler = scheduler
        self.heartbeat = task.LoopingCall(nextcall.schedule)

这里我们可以看到,scheduler其实是绑定在engine.slot中的,这里面nextcall绑定了engine._next_request方法,这个方法是请求调度的开始我们后面会讲,至于heartbeat这个字面意思是心跳,不过这里的心跳不同于很多框架中报告自己存活的那个心跳,scrapy这个心跳的功能是每隔一段时间(5s,open_spider方法的最后一句设置的)运行一次engine._next_request。下面看scheduler.open()

    def open(self, spider):
        self.spider = spider
        self.mqs = self.pqclass(self._newmq) # 将mqclass封装成优先级队列
        self.dqs = self._dq() if self.dqdir else None # 默认None不开启,如果开启将dqclass封装成优先级队列
        return self.df.open()
    
    def _newmq(self, priority):
        return self.mqclass()

self.mqs实例化,将mqclass封装成了优先级队列(pqclass没有队列实体只是实现了优先级功能)。mqclass本身是LIFO(后进先出)队列,所以self.mqs即有优先级特性、也有LIFO特性,同时也正是LIFO特性实现了scrapy的默认深度优先,那么如果需要让scrapy变为广度优先需要怎么做呢?当然是将mqs改为先进先出队列了。硬盘队列默认是不使用的为None,如果是开启硬盘队列的话同mqs。最后是开启了去重

现在得回到前面open_spider方法最后两句slot.nextcall.schedule()、slot.heartbeat.start(5)是将nextcall丢到twisted的循环里,start(5)之前说过了设置心跳时间为5,即每5秒执行一次eng._next_request方法,至此process.crawl(MeijuSpider)执行完毕。

下面是process.start(),这个方法没有太多好说的,就是默认设置了10个线程用于主流程之外的处理,比如信号暂停、停止信息的捕获什么的,这些是通过多线程完成的,当然这个多线程是twisted内部实现的。比较重要的一点就是这个方法执行了reactor.run开启了twisted的主循环。现在scrapy真正要开始动起来了。首先进入的是之前nextcall绑定的_next_request方法

    def _next_request(self, spider):
        slot = self.slot
        if not slot:
            return

        if self.paused:
            return

        while not self._needs_backout(spider): # 是否需要等待(条件 引擎知否运行、引擎的slot是否关闭、downloader、scraper中处理的请求or数据是否已达上限)
            if not self._next_request_from_scheduler(spider): # start_requests为None时才会真正的进入,不然进去也会被return出来
                break

        if slot.start_requests and not self._needs_backout(spider):
            try:
                request = next(slot.start_requests)
            except StopIteration:
                slot.start_requests = None
            except Exception:
                slot.start_requests = None
                logger.error('Error while obtaining start requests',
                             exc_info=True, extra={'spider': spider})
            else:
                self.crawl(request, spider)

        if self.spider_is_idle(spider) and slot.close_if_idle:
            self._spider_idle(spider)
----------------------------------------------------------------------------------------------------------
    def crawl(self, request, spider):
        assert spider in self.open_spiders, \
            "Spider %r not opened when crawling: %s" % (spider.name, request)
        self.schedule(request, spider)
        self.slot.nextcall.schedule()

第一次进来该方法的时候肯定是start_requests第一次被迭代的时候,这时候进入while代码块时会被return出来,因为这个scheduler的队列里面没有request,我们就直接看它会执行的if代码块,这里面运行了self.crawl(request, spider),这个方法就是虚线下面的代码,调用了engine的schedule方法这个方法就是将request入scheduler的队列(默认内存队列mqs),然后又调用了一遍nextcall(engine._next_reqeust)。再次看_next_reqeust方法,这次scheduler的队列已经有请求了所以真正的会执行while内部的逻辑,当然执行完后如果start_requests还有值(自己实现的Spider里start_urls = [url1, url2, ...的情况])依然会将剩余的url2-...入队,这次我们看while内部的self._next_request_from_scheduler(spider)

    def _next_request_from_scheduler(self, spider):
        slot = self.slot
        request = slot.scheduler.next_request() # 出队,优先从mqs(内存队列)里出,如果为空则尝试从dqs(硬盘队列)出队
        if not request: # 如果没有请求会被return出去,第一次start_requests就是在这被return的
            return
        d = self._download(request, spider) # 下载
        d.addBoth(self._handle_downloader_output, request, spider) # 添加response处理,具体就是走spider中间件、itemPipelines中间件
        d.addErrback(lambda f: logger.info('Error while handling downloader output',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        d.addBoth(lambda _: slot.remove_request(request)) # 从正在处理的标识集合(set)中删除
        d.addErrback(lambda f: logger.info('Error while removing request from slot',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        d.addBoth(lambda _: slot.nextcall.schedule()) # 继续调用nextcall处理reqeust
        d.addErrback(lambda f: logger.info('Error while scheduling new request',
                                           exc_info=failure_to_exc_info(f),
                                           extra={'spider': spider}))
        return d

这个方法首先出队,然后将进入了一个下载方法_download,后续又给绑定了一些回调,ok我们先看下_download

    def _download(self, request, spider):
        slot = self.slot
        slot.add_request(request) # 添加到正在处理的标识集合(engine.inprogress, type set)中
        def _on_success(response):
            assert isinstance(response, (Response, Request))
            if isinstance(response, Response):
                response.request = request # tie request to response received
                logkws = self.logformatter.crawled(request, response, spider)
                logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
                self.signals.send_catch_log(signal=signals.response_received, \
                    response=response, request=request, spider=spider)
            return response

        def _on_complete(_):
            slot.nextcall.schedule()
            return _

        dwld = self.downloader.fetch(request, spider)
        dwld.addCallbacks(_on_success)
        dwld.addBoth(_on_complete)
        return dwld

首先将reqeust添加到了engine.inprogress中,这个属性是个set集合用来标识这个请求正在处理,注意这里标识的是正在走全流程(就是流程图中那一套)而不是某一个组件的流程比如downloader。继续往下看,self.downloader.fetch(request, spider),看明显这是开始调用downloader组件了,后面几句绑定的回调是写日志跟继续调度队列中的请求,好我们深入到downloader组件

    def fetch(self, request, spider):
        def _deactivate(response):
            self.active.remove(request)
            return response

        self.active.add(request) # 添加到正在下载的标识集合
        dfd = self.middleware.download(self._enqueue_request, request, spider)
        return dfd.addBoth(_deactivate) # 绑定回调,下载完毕后将请求从正在下载标识中移除

这个方法是在更改downloader的状态,将请求添加到正在下载的标识集合中,以及下载完后将其从标识中移除,其实我们可以发现到现在为止好像每个组件都有一个正在处理的标识,这个的作用是让人能很清晰看出scrapy的运行情况每个组件的负载,以此分析出性能瓶颈做优化等等,当然scrapy自带的telnet查看运行情况的功能其实就是len(标识)来做的。话题扯回来我们继续看self.middleware.download(self._enqueue_request, request, spider)

    def download(self, download_func, request, spider):
        @defer.inlineCallbacks
        def process_request(request):
            for method in self.methods['process_request']:
                response = yield method(request=request, spider=spider)
                assert response is None or isinstance(response, (Response, Request)), \
                        'Middleware %s.process_request must return None, Response or Request, got %s' % \
                        (six.get_method_self(method).__class__.__name__, response.__class__.__name__)
                if response:
                    defer.returnValue(response)
            defer.returnValue((yield download_func(request=request,spider=spider)))

        @defer.inlineCallbacks
        def process_response(response):
            assert response is not None, 'Received None in process_response'
            if isinstance(response, Request):
                defer.returnValue(response)

            for method in self.methods['process_response']:
                response = yield method(request=request, response=response,
                                        spider=spider)
                assert isinstance(response, (Response, Request)), \
                    'Middleware %s.process_response must return Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, type(response))
                if isinstance(response, Request):
                    defer.returnValue(response)
            defer.returnValue(response)

        @defer.inlineCallbacks
        def process_exception(_failure):
            exception = _failure.value
            for method in self.methods['process_exception']:
                response = yield method(request=request, exception=exception,
                                        spider=spider)
                assert response is None or isinstance(response, (Response, Request)), \
                    'Middleware %s.process_exception must return None, Response or Request, got %s' % \
                    (six.get_method_self(method).__class__.__name__, type(response))
                if response:
                    defer.returnValue(response)
            defer.returnValue(_failure)

        deferred = mustbe_deferred(process_request, request)
        deferred.addErrback(process_exception)
        deferred.addCallback(process_response)
        return deferred

download这个方法主要是走一波downloader的中间件,先走process_request然后,走完process_request会调用一波传进来的downloader._enqueue_request方法,根据下载的成功与否分别走process_response、process_exception。

    def _enqueue_request(self, request, spider):
        key, slot = self._get_slot(request, spider)
        request.meta['download_slot'] = key

        def _deactivate(response):
            slot.active.remove(request)
            return response

        slot.active.add(request)
        self.signals.send_catch_log(signal=signals.request_reached_downloader,
                                    request=request,
                                    spider=spider)
        deferred = defer.Deferred().addBoth(_deactivate)
        slot.queue.append((request, deferred)) # 添加到待处理队列中,双向队列
        self._process_queue(spider, slot)
        return deferred

开始先获取key(hostname)跟其对应的slot,其实目前为止downloader.slots一直为空,真正的实例化也就是在self._get_slot(request, spider)这个方法里,如果获取不到的话该方法会实例化一个slot,从request中获取到url的hostname做key,然后downloader.slots[key] = slot,最终将key与slot返回,就是这种每个hostname对应一个slot的设计很方便的实现了对域名的并发的限制。继续_enqueue_request方法给请求添加了slot标识,将请求添加到正在处理标识集合中然后将请求入队,最终调用downloader._process_queue方法

    def _process_queue(self, spider, slot):
        if slot.latercall and slot.latercall.active():
            return

        # Delay queue processing if a download_delay is configured
        now = time()
        delay = slot.download_delay()
        if delay: 
            penalty = delay - now + slot.lastseen
            if penalty > 0:
                slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)
                return

        # Process enqueued requests if there are free slots to transfer for this slot
        while slot.queue and slot.free_transfer_slots() > 0:
            slot.lastseen = now
            request, deferred = slot.queue.popleft() # 出队
            dfd = self._download(slot, request, spider) # 下载
            dfd.chainDeferred(deferred)
            # prevent burst if inter-request delays were configured
            if delay:
                self._process_queue(spider, slot)
                break

有设置delay的话就delay一下,注意这个delay的值是给定参数的0.5-1.5倍范围内随机取值,后面如果队列里有请求并且正在下载中的请求没有达到上限的话就会从任务队列里拿到请求,丢入downloader._download

    def _download(self, slot, request, spider):

        dfd = mustbe_deferred(self.handlers.download_request, request, spider)

        def _downloaded(response):
            self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider)
            return response

        dfd.addCallback(_downloaded)
        slot.transferring.add(request) # 添加正在下载标识

        def finish_transferring(_):
            slot.transferring.remove(request) 
            self._process_queue(spider, slot)
            return _

        return dfd.addBoth(finish_transferring)

方法的开头就调到self.handlers.download_request方法,这个方法算是scrapy的最后一级的下载方法,self.handlers.download_request获取了与请求的scheme对应的下载器进行了下载,再往下递进几个方法就是twisted的下载实现了,对于下载我们就深入到这。回到downloader._download继续往下走,添加了正在下载的标识,最后下载完成后将其从正在下载标识中移除,继续对队列里的请求进行下载。这里的标识有点多,你可能会有点乱,不要紧我们理一下
downloader.active(set) request进入downloader组件的标识
downloader.slot.active request进入到slot的标识
downloader.slot.transferring request正在下载的标识
downloader.slot.queue 等待下载的request队列
ok现在下载走完了,下载完成后会经过downloader的response中间件处理前面已经说过了,现在我们来看经过downloader中间件处理完后response的流向,也就是之前在downloader._next_request_from_scheduler中绑定的_handle_downloader_output

    def _handle_downloader_output(self, response, request, spider):
        assert isinstance(response, (Request, Response, Failure)), response
        # downloader middleware can return requests (for example, redirects)
        if isinstance(response, Request): # 如果返回的是Request则重新进行抓取
            self.crawl(response, spider)
            return
        # response is a Response or Failure
        d = self.scraper.enqueue_scrape(response, request, spider) # 进入scraper处理
        d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
                                            exc_info=failure_to_exc_info(f),
                                            extra={'spider': spider}))
        return d

ok,我们看到在这个方法中scrapy的注释说的挺明白的,response可能是个Request对象比如301 302这种情况,这种会调用downloader.crawl方法继续抓取,那我们重点看下将response交给scraper处理的代码self.scraper.enqueue_scrape(response, request, spider)

    def enqueue_scrape(self, response, request, spider):
        slot = self.slot
        dfd = slot.add_response_request(response, request) # 入队
        def finish_scraping(_):
            slot.finish_response(response, request)
            self._check_if_closing(spider, slot)
            self._scrape_next(spider, slot)
            return _
        dfd.addBoth(finish_scraping)
        dfd.addErrback(
            lambda f: logger.error('Scraper bug processing %(request)s',
                                   {'request': request},
                                   exc_info=failure_to_exc_info(f),
                                   extra={'spider': spider}))
        self._scrape_next(spider, slot)
        return dfd

首先将response入队,绑定了几个回调,下面调用scraper._scrape_next

    def _scrape_next(self, spider, slot):
        while slot.queue:
            response, request, deferred = slot.next_response_request_deferred() # 出队,获取response
            self._scrape(response, request, spider).chainDeferred(deferred) # 转移,继续处理

ok,出队后将其交给了下一个方法处理

    def _scrape(self, response, request, spider):
        """Handle the downloaded response or failure through the spider
        callback/errback"""
        assert isinstance(response, (Response, Failure))

        dfd = self._scrape2(response, request, spider) # returns spiders processed output
        dfd.addErrback(self.handle_spider_error, request, response, spider)
        dfd.addCallback(self.handle_spider_output, request, response, spider)
        return dfd

将传进来的参数进一步交给scraper._scrape2,然后绑定了几个回调,其中绑定的self.handle_spider_output处于流程的尾部我们最后会再分析,现在还是先看下scraper._scrape2

    def _scrape2(self, request_result, request, spider):
        """Handle the different cases of request's result been a Response or a
        Failure"""
        if not isinstance(request_result, Failure):
            return self.spidermw.scrape_response(
                self.call_spider, request_result, request, spider)
        else:
            # FIXME: don't ignore errors in spider middleware
            dfd = self.call_spider(request_result, request, spider)
            return dfd.addErrback(
                self._log_download_errors, request_result, request, spider)

下载成功走if,失败走else,我们看到了无论成功还是失败都会调用scraper.call_spider,我们就只看成功吧。
先看if代码块,如果下载失败,那么就让spider中间件去处理,然后最终调用reqeust.errback,因为如果下载成功的话也会有一波类似的处理,我们这就不再展开关于失败细节的讨论

    def scrape_response(self, scrape_func, response, request, spider):
        fname = lambda f:'%s.%s' % (
                six.get_method_self(f).__class__.__name__,
                six.get_method_function(f).__name__)

        def process_spider_input(response):
            for method in self.methods['process_spider_input']:
                try:
                    result = method(response=response, spider=spider)
                    assert result is None, \
                            'Middleware %s must returns None or ' \
                            'raise an exception, got %s ' \
                            % (fname(method), type(result))
                except:
                    return scrape_func(Failure(), request, spider)
            return scrape_func(response, request, spider)

        def process_spider_exception(_failure):
            exception = _failure.value
            for method in self.methods['process_spider_exception']:
                result = method(response=response, exception=exception, spider=spider)
                assert result is None or _isiterable(result), \
                    'Middleware %s must returns None, or an iterable object, got %s ' % \
                    (fname(method), type(result))
                if result is not None:
                    return result
            return _failure

        def process_spider_output(result):
            for method in self.methods['process_spider_output']:
                result = method(response=response, result=result, spider=spider)
                assert _isiterable(result), \
                    'Middleware %s must returns an iterable object, got %s ' % \
                    (fname(method), type(result))
            return result

        dfd = mustbe_deferred(process_spider_input, response)
        dfd.addErrback(process_spider_exception)
        dfd.addCallback(process_spider_output)
        return dfd

看起来是走了一波中间件,并且在走完process_spider_input这部门中间件后调用了传入的scraper.call_spider,提下process_spider_output中间件吧,这部门主要是做了为response添加抓取深度信息以及添加referer等,往下看scraper.call_spider

    def call_spider(self, result, request, spider):
        result.request = request
        dfd = defer_result(result)
        dfd.addCallbacks(request.callback or spider.parse, request.errback)
        return dfd.addCallback(iterate_spider_output)

看下方法内部的第三行,如果request绑定了callback就将其添加回调,没有的话就回调我们自己在spider(MeijuSpider)中实现的解析方法,另外添加了错误回调方法,当然request.callback、request.errback默认都是None,现在我们终于看了我们自己实现的parse方法是在scraper中被调用的。spider中间件走完了下面该看itemPipeline了,也就是我们前面提过的scraper.handle_spider_output回调

    def handle_spider_output(self, result, request, response, spider):
        if not result:
            return defer_succeed(None)
        it = iter_errback(result, self.handle_spider_error, request, response, spider)
        dfd = parallel(it, self.concurrent_items,
            self._process_spidermw_output, request, response, spider)
        return dfd

这里的result是前面回调方法执行完吐出来的(request.callback、parse、request.errback),下面我们再看scraper._process_spidermw_output

    def _process_spidermw_output(self, output, request, response, spider):
        """Process each Request/Item (given in the output parameter) returned
        from the given spider
        """
        if isinstance(output, Request):
            self.crawler.engine.crawl(request=output, spider=spider)
        elif isinstance(output, (BaseItem, dict)):
            self.slot.itemproc_size += 1
            dfd = self.itemproc.process_item(output, spider) # 走itemPipeLine中间件
            dfd.addBoth(self._itemproc_finished, output, response, spider)
            return dfd
        elif output is None:
            pass
        else:
            typename = type(output).__name__
            logger.error('Spider must return Request, BaseItem, dict or None, '
                         'got %(typename)r in %(request)s',
                         {'request': request, 'typename': typename},
                         extra={'spider': spider})

output也就是前面的result如果是Request类型那么继续进行抓取相关的调度,如果是数据模型、字典那么会走最后的ItemPipeLine中间件。

大致走了一遍scrapy,现在我们其实可以看出来scrapy是一个多线程异步框架,多线程体现在了与主流程无关的地方(信号监听之类的),流程处理其实还是基于twisted的异步做的,CONCURRENT_REQUESTS这个参数作用的不是线程,可以看做是协程一样的东西,调大点吧兄弟。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容