scrapy核心组件包括:
- spider
- scheduler
- middleware
- itempipelines
- engine
scrapy运行流程如图所示:
spider发送requests给我们的engine,这里我们需要明确,scrapy是单线程的,并非多线程,那么它的运行核心就是epoll+select事件循环,engine就像一颗心脏,保证scrapy框架正常运行,所有的请求都需要经过这个组件。
spider发送requests到scheduler中间经过中间件过滤,在scheduler中enqueue_request方法调用了request_seen函数
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
return True
这里是request_seen方法,最终调用request_fingerprint
def request_seen(self, request):
fp = self.request_fingerprint(request)
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + '\n')
def request_fingerprint(self, request):
return request_fingerprint(request)
request_fingerprint核心代码,利用hash进行过滤
if include_headers:
include_headers = tuple(to_bytes(h.lower())
for h in sorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
cache_key = (include_headers, keep_fragments)
if cache_key not in cache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url, keep_fragments=keep_fragments)))
fp.update(request.body or b'')
if include_headers:
for hdr in include_headers:
if hdr in request.headers:
fp.update(hdr)
for v in request.headers.getlist(hdr):
fp.update(v)
cache[cache_key] = fp.hexdigest()
return cache[cache_key]
一般的数据量都可以进行有效过滤,后面会介绍布隆过滤器