pyspider process和result部分源码分析

终于弄清楚,pyspider为什么重写on_result之后,调试的时候可以把数据插入数据库,而不重写的时候不行。

这一篇文章主要是记录process和result部分的内容。之后会通过这些内容改写一下数据库。

def run(self):
        '''Run loop'''
        logger.info("processor starting...")

        while not self._quit:
            try:
                task, response = self.inqueue.get(timeout=1)
                self.on_task(task, response)
                self._exceptions = 0
            except Queue.Empty as e:
                continue
            except KeyboardInterrupt:
                break
            except Exception as e:
                logger.exception(e)
                self._exceptions += 1
                if self._exceptions > self.EXCEPTION_LIMIT:
                    break
                continue

        logger.info("processor exiting...")

process进程通过run开始,这段代码通过一个while循环,监听inqueue队列,然后把task,response在on_task运行。inqueue队列是processor2result队列,如果你用消息队列的话,你可以看到消息队列里面有这个队列。

def on_task(self, task, response):
        '''Deal one task'''
        start_time = time.time()
        response = rebuild_response(response)

        try:
            assert 'taskid' in task, 'need taskid in task'
            project = task['project']
            updatetime = task.get('project_updatetime', None)
            md5sum = task.get('project_md5sum', None)
            project_data = self.project_manager.get(project, updatetime, md5sum)
            assert project_data, "no such project!"
            if project_data.get('exception'):
                ret = ProcessorResult(logs=(project_data.get('exception_log'), ),
                                      exception=project_data['exception'])
            else:
                #注意这里把爬虫实例执行并且把结果返回给队列,最后返回一个processresult对象
                ret = project_data['instance'].run_task(
                    project_data['module'], task, response)
        except Exception as e:
            logstr = traceback.format_exc()
            ret = ProcessorResult(logs=(logstr, ), exception=e)
        process_time = time.time() - start_time

        if not ret.extinfo.get('not_send_status', False):
            if ret.exception:
                track_headers = dict(response.headers)
            else:
                track_headers = {}
                for name in ('etag', 'last-modified'):
                    if name not in response.headers:
                        continue
                    track_headers[name] = response.headers[name]

            status_pack = {
                'taskid': task['taskid'],
                'project': task['project'],
                'url': task.get('url'),
                'track': {
                    'fetch': {
                        'ok': response.isok(),
                        'redirect_url': response.url if response.url != response.orig_url else None,
                        'time': response.time,
                        'error': response.error,
                        'status_code': response.status_code,
                        'encoding': getattr(response, '_encoding', None),
                        'headers': track_headers,
                        'content': response.text[:500] if ret.exception else None,
                    },
                    'process': {
                        'ok': not ret.exception,
                        'time': process_time,
                        'follows': len(ret.follows),
                        'result': (
                            None if ret.result is None
                            else utils.text(ret.result)[:self.RESULT_RESULT_LIMIT]
                        ),
                        'logs': ret.logstr()[-self.RESULT_LOGS_LIMIT:],
                        'exception': ret.exception,
                    },
                    'save': ret.save,
                },
            }
            if 'schedule' in task:
                status_pack['schedule'] = task['schedule']

            # FIXME: unicode_obj should used in scheduler before store to database
            # it's used here for performance.
            #把status信息放入status_queue,这个队列还不知道做什么用
            self.status_queue.put(utils.unicode_obj(status_pack))

        # FIXME: unicode_obj should used in scheduler before store to database
        # it's used here for performance.
        #如果有新的url放入这个队列
        if ret.follows:
            for each in (ret.follows[x:x + 1000] for x in range(0, len(ret.follows), 1000)):
                self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])

on_task比较长。
先看project_manager这个方法,获得project的信息,这里比较主要的是instance,是一个爬虫的handlerbase实例,可以调用run_task()方法,这块怎么来的以后再说吧,没时间了

project_data = self.project_manager.get(project, updatetime, md5sum)

然后是往队列里面扔东西,status_queue不知道干啥,newtask_queue应该是新的url队列,另外ret是processresult对象,以后看

 self.status_queue.put(utils.unicode_obj(status_pack))
self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])

run_task方法

def run_task(self, module, task, response):
        """
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
        """
        self.logger = logger = module.logger
        result = None
        exception = None
        stdout = sys.stdout
        self.task = task
        if isinstance(response, dict):
            response = rebuild_response(response)
        self.response = response
        self.save = (task.get('track') or {}).get('save', {})

        try:
            if self.__env__.get('enable_stdout_capture', True):
                sys.stdout = ListO(module.log_buffer)
            self._reset()
            result = self._run_task(task, response)
            if inspect.isgenerator(result):
                for r in result:
                    self._run_func(self.on_result, r, response, task)
            else:
                self._run_func(self.on_result, result, response, task)
        except Exception as e:
            logger.exception(e)
            exception = e
        finally:
            follows = self._follows
            messages = self._messages
            logs = list(module.log_buffer)
            extinfo = self._extinfo
            save = self.save

            sys.stdout = stdout
            self.task = None
            self.response = None
            self.save = None

        module.log_buffer[:] = []
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)

这里面有调用_run_func其实就是调用callback的那个函数,这里是执行采集的地方,

result = self._run_task(task, response)
            if inspect.isgenerator(result):
                for r in result:
                    self._run_func(self.on_result, r, response, task)
            else:
                self._run_func(self.on_result, result, response, task)

这里执行方法获得结果,如果是return的数据,执行on_result方法。

 def on_result(self, result):
        """Receiving returns from other callback, override me."""
        if not result:
            return
        assert self.task, "on_result can't outside a callback."
        if self.is_debugger():
            pprint(result)
        if self.__env__.get('result_queue'):
            self.__env__['result_queue'].put((self.task, result))

on_result把结果放在result_queue里面

大概process就干了这些

 def run(self):
        '''Run loop'''
        logger.info("result_worker starting...")

        while not self._quit:
            try:
                task, result = self.inqueue.get(timeout=1)
                self.on_result(task, result)
            except Queue.Empty as e:
                continue
            except KeyboardInterrupt:
                break
            except AssertionError as e:
                logger.error(e)
                continue
            except Exception as e:
                logger.exception(e)
                continue

        logger.info("result_worker exiting...")

result开始监听队列执行on_result

  def on_result(self, task, result):
        '''Called every result'''
        if not result:
            return
        if 'taskid' in task and 'project' in task and 'url' in task:
            logger.info('result %s:%s %s -> %.30r' % (
                task['project'], task['taskid'], task['url'], result))
            return self.resultdb.save(
                project=task['project'],
                taskid=task['taskid'],
                url=task['url'],
                result=result
            )
        else:
            logger.warning('result UNKNOW -> %.30r' % result)
            return

存入数据库,用project,taskid,url的表,就是默认的那个

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,673评论 18 139
  • 昨天的文让大家恶心了一把,今天我做点补偿哈~~科普一个真正的萌物——矮种马。 马中柯基,绝对萌神 看到这个标题是不...
    肖爷_族长阅读 8,941评论 4 15
  • 使用Javascript可以方便获得页面的参数信息,常用的几种如下: 设置或获取对象指定的文件名或路径 设置或获取...
    duJing阅读 305评论 0 0
  • 刚才吃早餐时,想出了10篇短篇小说,以下是题目,望您斧正! 1.《四岁毛孩心机重,嫌自己压力大,竟将压力甩给了亲哥...
    叔孙伯阅读 597评论 0 6