计划表:
- 基本的Request/Response (完成)
- 实现异步下载 (完成)
- 加入队列Queue,为实现调度器做准备 (完成)
- 加入引擎管理 (完成)
- 加入调度器管理 (当前进度)
- 加入下载器管理
- 加入下载器中间件管理
- 加入爬虫进程管理
- 加入信号机制管理
上节加入了引擎,其实就是把,数据初始化到队列,从队列获取数据并下载,处理下载后的结果三个步骤,以异步的逻辑整合到一个模块中去了,当然和原生Scrapy框架肯定比不了啦
这节我们再添加一个模块,调度器模块
我们先来看一下整体结构:
class Scheduler:...
class CallLaterOnce:...
class Slot:...
class Engine:...
class Request:...
class Response:...
class MySpider:...
其中有几个我们眼熟的模块,我们来介绍下:
- MySpider: 用于存放爬虫逻辑的类
class MySpider:
start_urls = ['http://www.czasg.xyz', 'http://www.czasg.xyz', 'http://www.czasg.xyz', 'http://www.czasg.xyz',
'http://www.czasg.xyz', 'http://www.czasg.xyz', 'http://www.czasg.xyz', 'http://www.czasg.xyz']
def start_requests(self):
yield from (Request(url, self.parse) for url in self.start_urls)
def parse(self, response):
print(response.url)
- Request / Response:存放请求和结果的对象
- Engine:引擎,相比于上一节,我们应该不难猜出,这部分代码有一定的调整,我们需要把对Queue的处理,拆分并整合到调度器模块中,接下来看看如何重新设计此模块
class Engine:
def __init__(self):...
@defer.inlineCallbacks
def start(self):...
@defer.inlineCallbacks
def open_spider(self, spider, start_requests):...
def _next_request(self, spider):...
def _next_request_from_scheduler(self, spider):...
def _handle_downloader_output(self, byte_content, request, spider):...
我们可以看到,多出了一个_next_request_from_scheduler
函数,从名字中我们可以猜到两分,就是从调度器获取Request的作用。我们直接上代码,其他模块也都有相应的调整,但是基本功能都是类似的。
class Engine:
def __init__(self):
self.max_pool_size = 4
self.crawling = []
self.slot = None
self.running = True
@defer.inlineCallbacks
def start(self):
self._closewait = defer.Deferred()
yield self._closewait
@defer.inlineCallbacks
def open_spider(self, spider, start_requests):
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = Scheduler.from_crawler()
self.slot = Slot(start_requests, nextcall, scheduler)
yield scheduler.open()
self.slot.nextcall.schedule()
def _next_request(self, spider):
slot = self.slot
if not slot:
return
while not slot.scheduler.isEmpty():
if not self._next_request_from_scheduler(spider):
break
if slot.start_requests:
try:
request = next(slot.start_requests)
slot.inprogress.append(request)
except StopIteration:
slot.start_requests = None
else:
slot.scheduler.enqueue_request(request)
slot.nextcall.schedule()
if slot.start_requests is None and slot.scheduler.isEmpty() and not slot.inprogress:
self._closewait.callback(None)
def _next_request_from_scheduler(self, spider):
request = self.slot.scheduler.next_request()
if not request:
return
dfd = getPage(request.url.encode())
dfd.addBoth(self._handle_downloader_output, request, spider)
dfd.addBoth(lambda _: self.slot.inprogress.remove(request))
dfd.addBoth(lambda _: self.slot.nextcall.schedule())
return dfd
def _handle_downloader_output(self, byte_content, request, spider):
response = Response(byte_content, request)
request.callback(response)
- Slot: 引擎内部管理模块,包含入口start_requests对象,调度器scherduler,魔术方法nextcall,并维护一个正在执行对象的列表
class Slot:
def __init__(self, start_requests, nextcall, scheduler):
self.start_requests = iter(start_requests)
self.nextcall = nextcall
self.scheduler = scheduler
self.inprogress = []
- CallLaterOnce:魔术方法,Scrapy源码搬过来的,大致意思就是,该类是一个可调用类,每次调用都会执行
__call__
方法,故在
def schedule(self):
绑定自身类,每次调用此方法都会执行一次自身类,进而调用__call__
方法
class CallLaterOnce:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
self._call = None
def schedule(self):
if self._call is None:
self._call = reactor.callLater(0, self)
def cancel(self):
if self._call:
self._call.cancel()
def __call__(self, *args, **kwargs):
self._call = None
return self.func(*self.args, **self.kwargs)
- Scheduler: 调度器,该模块就是维护一个队列,并实现一些对此队列的操作方法
class Scheduler:
def __init__(self):
self.mq = Queue()
@classmethod
def from_crawler(cls):
return cls()
def open(self):
return
def next_request(self):
return self.mq.get(block=False)
def enqueue_request(self, request):
self.mq.put(request)
def isEmpty(self):
return self.mq.qsize() == 0
接下来就是如何启动了:
if __name__ == '__main__':
@defer.inlineCallbacks
def crawl():
spider = MySpider()
engine = Engine()
yield engine.open_spider(spider, spider.start_requests())
yield engine.start()
d = crawl()
d.addBoth(lambda _: reactor.stop())
reactor.run()
嘿嘿,其实启动方法和上节一样。
- yield engine.open_spider(spider, spider.start_requests()):初始化引擎,包括:CallLaterOnce魔术方法的绑定、Scheduler调度器的初始化、Slot管理类的初始化等
- yield engine.start(): 获取一个Deffered句柄,绑定
reactor.stop()
以便停止爬虫。
然后介绍下重要知识点:
@defer.inlineCallbacks
def open_spider(self, spider, start_requests):
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = Scheduler.from_crawler()
self.slot = Slot(start_requests, nextcall, scheduler)
yield scheduler.open()
self.slot.nextcall.schedule()
上述代码中,CallLaterOnce(self._next_request, spider)
非常有意思,绑定函数self._next_request
,并在最后一行执行一次self.slot.nextcall.schedule()
,从代码层面意思看,就是执行一次self._next_request
函数。
def _next_request(self, spider):
slot = self.slot
if not slot:
return
while not slot.scheduler.isEmpty():
if not self._next_request_from_scheduler(spider):
break
if slot.start_requests:
try:
request = next(slot.start_requests)
slot.inprogress.append(request)
except StopIteration:
slot.start_requests = None
else:
slot.scheduler.enqueue_request(request)
slot.nextcall.schedule()
if slot.start_requests is None and slot.scheduler.isEmpty() and not slot.inprogress:
self._closewait.callback(None)
而该函数的功能,当调度器队列不为空时,执行self._next_request_from_scheduler(spider)
,把队列榨干,然后在后面又获取一个Request对象,request = next(slot.start_requests)
,再把此Request推到调度器维护的队列中slot.scheduler.enqueue_request(request)
emmm...这其中滋味,确实得好好体味一番
github地址:https://github.com/CzaOrz/ioco/blob/t426/open_source_project/scrapy_simulate_tutorial/5.py