scrapy redis队列改成内存队列

从redis队列改为内存队列的原因

公司项目,同一时间有大量的任务进来,导致redis经常连接超时和连接失败。导致任务缓慢,来不及处理。
为了减轻redis的压力,所以将请求队列放到内存中。因为放到内存中,并且减少很多redis的请求,所以可以加快程序的执行速度。

改成内存队列的缺点

1、当程序重启时,当爬虫队列中还有任务未执行时,在内存中的数据会丢失。
2、不能充分使用scrapy-redis断点续爬的特性。

分析scrapy queue

查看scrapy-redis队列,我们只需要对此队列进行重写即可。


image.png

重写Base类

from scrapy.squeues import LifoMemoryQueue
from scrapy.utils.reqser import request_to_dict, request_from_dict
from scrapy_redis import picklecompat

class Base(object):
    """Per-spider base queue class"""

    def __init__(self, server, spider, key, serializer=None):
        if serializer is None:
            # Backward compatibility.
            # TODO: deprecate pickle.
            serializer = picklecompat
        if not hasattr(serializer, 'loads'):
            raise TypeError("serializer does not implement 'loads' function: %r"
                            % serializer)
        if not hasattr(serializer, 'dumps'):
            raise TypeError("serializer '%s' does not implement 'dumps' function: %r"
                            % serializer)

        self.server = server
        self.spider = spider
        self.key = key % {'spider': spider.name}
        self.serializer = serializer

    def _encode_request(self, request):
        """Encode a request object"""
        obj = request_to_dict(request, self.spider)
        return self.serializer.dumps(obj)

    def _decode_request(self, encoded_request):
        """Decode an request previously encoded"""
        obj = self.serializer.loads(encoded_request)
        return request_from_dict(obj, self.spider)

    def __len__(self):
        """Return the length of the queue"""
        raise NotImplementedError

    def push(self, request):
        """Push a request"""
        raise NotImplementedError

    def pop(self, timeout=0):
        """Pop a request"""
        raise NotImplementedError

    def clear(self):
        """Clear queue/stack"""
        raise NotImplementedError

内存队列

class SpiderQueue(Base):
    def __init__(self, *args, **kwargs):
        self.queues = {}
        self.qfactory = LifoMemoryQueue    # 原生scrapy的内存队列 
        self.curprio = None
        super(SpiderQueue, self).__init__(*args, **kwargs)

    def __len__(self):
        """Return the length of the queue"""
        return sum(len(x) for x in self.queues.values()) if self.queues else 0

    def push(self, request):
        # a = time.time()
        data = self._encode_request(request)
        priority = -request.priority
        if priority not in self.queues:
            self.queues[priority] = self.qfactory()
        q = self.queues[priority]
        q.push(data)
        if self.curprio is None or priority < self.curprio:
            self.curprio = priority
        # logging.info(f'入队列耗时:{time.time()-a}')

    def pop(self, timeout=0):
        # a = time.time()
        if self.curprio is None:
            return
        q = self.queues[self.curprio]
        m = q.pop()
        if len(q) == 0:
            # b = time.time()
            del self.queues[self.curprio]
            prios = [p for p, q in self.queues.items() if len(q) > 0]
            self.curprio = min(prios) if prios else None
            # logging.info(f'调整指针耗时:{time.time() - b} 出队列耗时:{time.time() - a}')
        return self._decode_request(m)

settings中配置

SCHEDULER_QUEUE_CLASS='路径.SpiderQueue',
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容