scrapy源码解析前戏,Twisted框架学习笔记

先看看在scrapy源码里面对于twisted框架的部分使用,主要是reactor这个充当事件循环的模块


image.png

创建多个task,让异步效果更加明显,函数不改,只改下面的代码


image.png

这10个请求就是并发的
如果存在yield也是可以的,如图


image.png

特殊的‘特殊的socket对象’
from twisted.internet import defer

from twisted.internet import defer#defer.Deferred(特殊的socket对象),不会发请求
from twisted.web.client import getPage#socket对象(如果下载完成...)
from twisted.internet import reactor#事件循坏

def response(content):
    print(content)
@defer.inlineCallbacks#添加到时事件循环的第一步
def task():
    url = 'http://www.baidu.com'
    d1=getPage(url.encode('utf-8'))
    d1.addCallback(response)

    url = 'http://www.baidu.com'
    d2 = getPage(url.encode('utf-8'))
    d2.addCallback(response)

    url = 'http://www.baidu.com'
    d3 = getPage(url.encode('utf-8'))
    d3.addCallback(response)
    '''一下创建三个socket,发送三个请求'''
    yield defer.Deferred()
    # url = 'http://www.baidu.com'
    # d = getPage(url.encode('utf-8'))
    # d.addCallback(response)
    # yield d

def done(*args,**kwargs):#事件循环完成时候结束该异步处理
    reactor.stop()
li=[]
# for i in range(10):
#     d=task()#相当于创建10个socket对象,添加到列表里面
#     li.append(d)
d=task()
dd=defer.DeferredList([d,])#列表里面所有任务都完成了,才终止
dd.addBoth(done)#成功获取返回值之后就触发停止函数
reactor.run()

运行结果如图



程序一直没有结束
完整代码

from twisted.internet import defer#defer.Deferred(特殊的socket对象),不会发请求
from twisted.web.client import getPage#socket对象(如果下载完成...)
from twisted.internet import reactor#事件循坏

_close=None
count=0
def response(content):
    global count
    count += 1
    print(content)
    if count==3:
        _close.callback(None)


@defer.inlineCallbacks#添加到时事件循环的第一步
def task():
    '''
    每个爬虫的开始:starts_requests
    :return:
    '''
    url = 'http://www.baidu.com'
    d1=getPage(url.encode('utf-8'))
    d1.addCallback(response)

    url = 'http://www.baidu.com'
    d2 = getPage(url.encode('utf-8'))
    d2.addCallback(response)

    url = 'http://www.baidu.com'
    d3 = getPage(url.encode('utf-8'))
    d3.addCallback(response)
    '''一下创建三个socket,发送三个请求'''
    global _close
    _close=defer.Deferred()

    yield _close#当三个请求都返回了,就执行这个函数
    # yield defer.Deferred()
    # url = 'http://www.baidu.com'
    # d = getPage(url.encode('utf-8'))
    # d.addCallback(response)
    # yield d

def done(*args,**kwargs):#事件循环完成时候结束该异步处理
    reactor.stop()
li=[]
# for i in range(10):
#     d=task()#相当于创建10个socket对象,添加到列表里面
#     li.append(d)
spider1=task()
spider2=task()
dd=defer.DeferredList([spider1,spider2])#列表里面所有任务都完成了,才返回
dd.addBoth(done)#成功获取返回值之后就触发停止函数
reactor.run()





自定义scrapy框架

from twisted.internet import reactor   # 事件循环(终止条件,所有的socket都已经移除)
from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
from twisted.internet import defer     # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)

class Request(object):

    def __init__(self,url,callback):
        self.url = url
        self.callback = callback
class HttpResponse(object):

    def __init__(self,content,request):
        self.content = content
        self.request = request
        self.url = request.url
        self.text = str(content,encoding='utf-8')


class ChoutiSpider(object):
    name = 'chouti'

    def start_requests(self):
        start_url = ['http://www.baidu.com','http://www.bing.com',]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下载的页面
        yield Request('http://www.cnblogs.com',callback=self.parse)

import queue
Q = queue.Queue()

class Engine(object):

    def __init__(self):
        self._close = None
        self.max = 5
        self.crawlling = []

    def get_response_callback(self,content,request):
        self.crawlling.remove(request)
        rep = HttpResponse(content,request)
        result = request.callback(rep)
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                Q.put(req)



    def _next_request(self):
        """
        去取request对象,并发送请求
        最大并发数限制
        :return:
        """
        print(self.crawlling,Q.qsize())
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        if len(self.crawlling) >= self.max:
            return
        while len(self.crawlling) < self.max:
            try:
                req = Q.get(block=False)
                self.crawlling.append(req)
                d = getPage(req.url.encode('utf-8'))
                # 页面下载完成,get_response_callback,调用用户spider中定义的parse方法,并且将新请求添加到调度器
                d.addCallback(self.get_response_callback,req)
                # 未达到最大并发数,可以再去调度器中获取Request
                d.addCallback(lambda _:reactor.callLater(0, self._next_request))
            except Exception as e:
                print(e)
                return

    @defer.inlineCallbacks
    def crawl(self,spider):
        # 将初始Request对象添加到调度器
        start_requests = iter(spider.start_requests())
        while True:
            try:
                request = next(start_requests)
                Q.put(request)
            except StopIteration as e:
                break

        # 去调度器中取request,并发送请求
        # self._next_request()
        reactor.callLater(0, self._next_request)

        self._close = defer.Deferred()
        yield self._close

spider = ChoutiSpider()


_active = set()
engine = Engine()
d = engine.crawl(spider)
_active.add(d)

dd = defer.DeferredList(_active)
dd.addBoth(lambda a:reactor.stop())

reactor.run()



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容