上篇文章介绍了将scrapy-redis队列,改成内存队列。这篇主要解决内存队列当程序重启时数据丢失的问题。
解决方案
当程序正常停止时,可以将队列中的数据重新放到redis里,当程序启动时就可以继续处理这个请求。
我们在写中间件以及piplines时,经常会看到spider_closed方法。此方法只有在程序正常关闭时,才会执行。所以,我们只需在再程序关闭时,重写这个方法,将数据保存到redis里即可。
代码
from scrapy import signals
from scrapy.exceptions import NotConfigured
class StopExtension(object):
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def __init__(self, crawler):
self.crawler = crawler
self.interval = crawler.settings.getfloat('CORE_METRICS_INTERVAL', 5)
if not self.interval:
raise NotConfigured
cs = crawler.signals
cs.connect(self.spider_closed, signal=signals.spider_closed)
def set_redis_pipeline_right(self, server, key_name, data_list):
# 将数据保存redis
begain_time = time.time()
for _ in range(5):
try:
with server.pipeline(transaction=False) as pipe:
for data in data_list:
pipe.rpush(key_name, data)
pipe.execute()
logging.info(f'ocrStopExtension insert redis time:{time.time() - begain_time}')
return
except:
pass
def handle_data(self, request):
# 在这里可以对request进行处理,
return ""
def spider_closed(self, spider):
engine = self.crawler.engine
server = spider.server
data_list = []
while engine.slot.scheduler.queue:
request = engine.slot.scheduler.queue.pop()
data_list.append(self.handle_data(request))
logging.info(f"ocrStopExtension 调度器--> {request}")
for request in self.crawler.engine.downloader.active:
data_list.append(self.handle_data(request))
logging.info(f"ocrStopExtension 下载器--> {request}")
self.set_redis_pipeline_right(server, "%s:start_urls" % spider.name, data_list)