解决改成内存队列数据丢失问题

上篇文章介绍了将scrapy-redis队列,改成内存队列。这篇主要解决内存队列当程序重启时数据丢失的问题。

解决方案

当程序正常停止时,可以将队列中的数据重新放到redis里,当程序启动时就可以继续处理这个请求。
我们在写中间件以及piplines时,经常会看到spider_closed方法。此方法只有在程序正常关闭时,才会执行。所以,我们只需在再程序关闭时,重写这个方法,将数据保存到redis里即可。

代码

from scrapy import signals
from scrapy.exceptions import NotConfigured


class StopExtension(object):

    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)

    def __init__(self, crawler):
        self.crawler = crawler
        self.interval = crawler.settings.getfloat('CORE_METRICS_INTERVAL', 5)
        if not self.interval:
            raise NotConfigured
        cs = crawler.signals

        cs.connect(self.spider_closed, signal=signals.spider_closed)

    def set_redis_pipeline_right(self, server, key_name, data_list):
        #  将数据保存redis
        begain_time = time.time()
        for _ in range(5):
            try:
                with server.pipeline(transaction=False) as pipe:
                    for data in data_list:
                        pipe.rpush(key_name, data)
                    pipe.execute()
                    logging.info(f'ocrStopExtension insert redis time:{time.time() - begain_time}')
                return
            except:
                pass

    def handle_data(self, request):
        # 在这里可以对request进行处理,
        return ""

    def spider_closed(self, spider):
        engine = self.crawler.engine
        server = spider.server
        data_list = []
        while engine.slot.scheduler.queue:
            request = engine.slot.scheduler.queue.pop()
            data_list.append(self.handle_data(request))
            logging.info(f"ocrStopExtension 调度器--> {request}")

        for request in self.crawler.engine.downloader.active:
            data_list.append(self.handle_data(request))
            logging.info(f"ocrStopExtension 下载器--> {request}")
        self.set_redis_pipeline_right(server, "%s:start_urls" % spider.name, data_list)


©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容