Scrapy_Redis抓取百度贴吧、微信文章、微信公众号、域名等通用式爬虫(一)

image.png

基于公司最近的业务,需要分析网络舆情,得到较为准确的信息,需要开发一款通用式爬虫,支持贴吧、微信、百度、域名、指定贴吧、指定关键字等抓取。本人故开发此项爬虫代码。

1.开发依赖环境 scrapy_redis+chrome+selenium(phantomjs也可以)

2.开发环境安装(详情见我的上一篇文章)

3.架构流程详解

  1. 获取需要抓取的关键字、域名、指定贴吧名字等建立一个mysql初始任务数据表,然后将这些数据插入到mysql中。详情见下图:


    image.png

    image.png

可以根据自己的需求进行配置。
2.爬虫启动过程中会去mysql拿下发任务进行抓取,crawled=0表示未抓取的任务,爬虫成功启动后会将crawled字段置为1表示任务开始抓取。当此任务结束时,爬虫在调用close_spider函数时,会将crawled状态更新为2,表示此项任务抓取结束。
st_status表示动静态开关切换,0表示静态、1表示动态。
crawltasktime:任务下发时间。
engine:1.表示微信搜狗引擎、2.表示百度引擎、3.表示贴吧 0.表示抓取域名。
keyword:表示抓取指定关键字。
domain: 表示抓取的指定域名。
depth:爬虫抓取深度限制。
width:爬虫抓取宽度限制。
accesspoint:CMWAP、CMNET爬取模式,多种模式中用“,”分隔,默认为CMNET。
totalpages:爬虫抓取页面数量限制。
CrawlFrequency:对任务爬取的频率 0表示爬取一次,1表示无线循环抓取。
cycletime:爬虫爬取时间限制,单位秒
repeattimes:爬虫请求失败重复请求次数设置。
interval:爬虫重复拨测时间间隔。
company:任务所属公司设置。
3.将抓取的文本、图片的数据存入mongodb或者mysql数据库。
4.进行数据分析提取相关数据。

4.scheduler 代码开发详解

因为我们所有的任务都是使用的同一款爬虫,所以爬虫在没有请求的时候,我们不让他继续等待。故修改scrapy_redis源码,让爬虫在没有请求的情况下,自动等待15秒后关闭爬虫。避免浪费内存资源,节省空间!首先找到scrapy_redis的scheduler.py这个文件夹。这个文件里面的代码主要作用就是收集指纹和处理请求,保证一个请求按一定(settings配置中的规则)的规则去消耗。
/usr/local/lib/python2.7/dist- packages/scrapy_redis/scheduler.py这是源代码的路径。多的不说,直接上图。

# coding=utf-8
import redis
import importlib
import six
import datetime
import time
from scrapy.utils.misc import load_object
import os
from . import connection, defaults


# TODO: add SCRAPY_JOB support.
class Scheduler(object):
    """Redis-based scheduler

    Settings
    --------
    SCHEDULER_PERSIST : bool (default: False)
        Whether to persist or clear redis queue.
    SCHEDULER_FLUSH_ON_START : bool (default: False)
        Whether to flush redis queue on start.
    SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
        How many seconds to wait before closing if no message is received.
    SCHEDULER_QUEUE_KEY : str
        Scheduler redis key.
    SCHEDULER_QUEUE_CLASS : str
        Scheduler queue class.
    SCHEDULER_DUPEFILTER_KEY : str
        Scheduler dupefilter redis key.
    SCHEDULER_DUPEFILTER_CLASS : str
        Scheduler dupefilter class.
    SCHEDULER_SERIALIZER : str
        Scheduler serializer.

    """
    #++++
    lostGetRequest = 0

    def __init__(self, server,
                 persist=False,
                 flush_on_start=False,
                 queue_key=defaults.SCHEDULER_QUEUE_KEY,
                 queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
                 dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
                 dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
                 idle_before_close=0,
                 serializer=None):
        """Initialize scheduler.

        Parameters
        ----------
        server : Redis
            The redis server instance.
        persist : bool
            Whether to flush requests when closing. Default is False.
        flush_on_start : bool
            Whether to flush requests on start. Default is False.
        queue_key : str
            Requests queue key.
        queue_cls : str
            Importable path to the queue class.
        dupefilter_key : str
            Duplicates filter key.
        dupefilter_cls : str
            Importable path to the dupefilter class.
        idle_before_close : int
            Timeout before giving up.

        """
        if idle_before_close < 0:
            raise TypeError("idle_before_close cannot be negative")

        self.server = server
        self.persist = persist
        self.flush_on_start = flush_on_start
        self.queue_key = queue_key
        self.queue_cls = queue_cls
        self.dupefilter_cls = dupefilter_cls
        self.dupefilter_key = dupefilter_key
        self.idle_before_close = idle_before_close
        self.serializer = serializer
        self.stats = None

    def __len__(self):
        return len(self.queue)

    @classmethod
    def from_settings(cls, settings):
        kwargs = {
            'persist': settings.getbool('SCHEDULER_PERSIST'),
            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
        }

        # If these values are missing, it means we want to use the defaults.
        optional = {
            # TODO: Use custom prefixes for this settings to note that are
            # specific to scrapy-redis.
            'queue_key': 'SCHEDULER_QUEUE_KEY',
            'queue_cls': 'SCHEDULER_QUEUE_CLASS',
            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
            # We use the default setting name to keep compatibility.
            'dupefilter_cls': 'DUPEFILTER_CLASS',
            'serializer': 'SCHEDULER_SERIALIZER',
        }
        for name, setting_name in optional.items():
            val = settings.get(setting_name)
            if val:
                kwargs[name] = val

        # Support serializer as a path to a module.
        if isinstance(kwargs.get('serializer'), six.string_types):
            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])

        server = connection.from_settings(settings)
        # Ensure the connection is working.
        server.ping()

        return cls(server=server, **kwargs)

    @classmethod
    def from_crawler(cls, crawler):
        instance = cls.from_settings(crawler.settings)
        # FIXME: for now, stats are only supported from this constructor
        instance.stats = crawler.stats
        return instance

    def open(self, spider):
        self.spider = spider
        print 'spider.name is %s' %(spider.redis_key)
        try:
            self.queue = load_object(self.queue_cls)(
                server=self.server,
                spider=spider,
                #key=self.queue_key % {'spider': spider.name},
                key=self.queue_key % {'spider': spider.host},
                serializer=self.serializer,
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate queue class '%s': %s",
                             self.queue_cls, e)

        try:
            self.df = load_object(self.dupefilter_cls)(
                server=self.server,
                #key=self.dupefilter_key % {'spider': spider.name},
                key=self.dupefilter_key % {'spider': spider.host},
                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
            )
        except TypeError as e:
            raise ValueError("Failed to instantiate dupefilter class '%s': %s",
                             self.dupefilter_cls, e)

        if self.flush_on_start:
            self.flush()
        # notice if there are requests already in the queue to resume the crawl
        if len(self.queue):
            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))

    def close(self, reason):
        if not self.persist:
            self.flush()

    def flush(self):
        self.df.clear()
        self.queue.clear()

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.lostGetRequest = 0
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        if request is None:
            self.lostGetRequest += 1
            print "request is None, lostGetRequest = {%s}, time = {%s}" %(self.lostGetRequest,datetime.datetime.now())
            if self.lostGetRequest >= 15:
                print "request is None, close spider."
                self.spider.crawler.engine.close_spider(self.spider, 'queue is empty')
        return request

    def has_pending_requests(self):
        return len(self) > 0

代码很简单,当spider被初始化时,同时会初始化一个对应的scheduler对象,这个调度器对象通过读取settings,配置好自己的调度容器queue和判重工具dupefilter。爬虫在运行过程过,会源源不断的通过引擎来和调度器协调工作,调度器将引擎分发的请求经过一系列的入队列、指纹收集等处理后,按settings中设置的爬取策略分发给下载中间件去下载。而这份代码我们主要设置了一个类变量用来进行计时。当next_request这个函数处理的请求不为空,我们就会重置他的状态为0,当请求为空,我们就要进行一个15秒的计时。当计时结束,引发异常调用close_spider将爬虫进行关闭。此代码还修改了scrapy_redis默认的dupfilter和request 两个key。

5.spider开发代码详解

# -*- coding: utf-8 -*-
import scrapy
from scrapy.http import Request, HtmlResponse
from bs4 import BeautifulSoup
import time
from urllib import urlencode
import re
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import redis
from redistest.settings import REDIS_PORT, REDIS_HOST
import hashlib
import random
import os
import urllib

def soup_text(body):
    try:
        soup = BeautifulSoup(body, 'lxml')
        for script in soup(["script", "style"]):
            script.extract()
        line = re.compile(r'\s+')
        line = line.sub(r'', soup.body.getText())
        #p2 = re.compile(u'[^\u4e00-\u9fa5]')  # 中GDAC\u4e00\u9fa5
        #str2 = p2.sub(r'', line)
        outStr = line.strip(',')
    except:
        outStr = ''
    return outStr

def rand5():
    randnum = ""
    for i in range(0, 5):
        randnum += str(random.randint(0, 9))
    return randnum

class BaiduSpider(scrapy.Spider):
    name = 'redistest'
    params = ''
    keyword = ''
    allowed_domains = []
    sousu = ''
    start_urls = []
    datadupefilter = ''
    filepath = ''

    def __init__(self, param=None, *args, **kwargs):
        super(BaiduSpider, self).__init__(*args, **kwargs)
        self.data_conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=8)
        self.yuqing = self.name+':item'
        self.datadupefilter = 'datadupefilter'
        self.params = eval(param)
        if self.params['keyword'] != '0':
            self.keyword = self.params['keyword'].decode('utf-8')
        print self.params
        self.sousu = self.params['engine']
        self.filepath = "/home/YuQing/scrapy_yuqing/content" 
        if not os.path.exists(self.filepath):
            os.mkdir(self.filepath)
        if self.sousu == 1:
            self.start_urls = ['http://weixin.sogou.com']
            self.params['redis_key'] = None
        elif self.sousu == 2:
            self.start_urls = ['https://www.baidu.com']
            self.params['redis_key'] = None
        elif self.sousu == 0:
            self.params['redis_key'] = None
            self.start_urls = ['https://tieba.baidu.com/f?ie=utf-8&kw=%s&fr=search&' % self.params['keyword']]
        else:
            if self.params['redis_key']:
                self.start_urls = [self.params['redis_key']]
        if self.params['crosssitecrawl'] == 0 and self.params['redis_key']:
            proto, rest = urllib.splittype(self.params['redis_key'])
            host, rest = urllib.splithost(rest)
            self.allowed_domains = [host.replace('www.','')]
        print self.start_urls,self.sousu,self.params['redis_key'],self.allowed_domains

    def make_requests_from_url(self, url):
        if self.params['st_status'] == 1:
            return Request(url, meta={'keyword': self.keyword, 'engine':self.sousu, 'phantomjs':True})
        else:
            return Request(url)

    def parse(self, response):
        # 判断页面的返回状态
        if int(response.status) >= 200 and int(response.status) < 400:
            if not self.params['redis_key'] and self.sousu:
                a_list = response.xpath('//h3/a/@href').extract()
                for url in a_list:
                    if url.startswith('http://') != True and url.startswith('https://') !=True:
                        url = response.urljoin(url)
                    yield scrapy.Request(url=url, meta={'url':response.url}, callback=self.parse_url)

                if response.meta.has_key('page') != True and self.sousu == 2:
                    flag = 1
                    for next_url in response.xpath('//div[@id="page"]/a/@href').extract():
                        if next_url.startswith('http://') != True and next_url.startswith('https://') !=True:
                            nextUrl = self.start_urls[0] + next_url
                            regex = 'pn=(\d+)'
                            page_number = re.compile(regex).search(nextUrl).group(1)
                            if page_number and flag:
                                flag = 0
                                # 抓取前50页
                                for page in range(10,500,10):
                                    next_page = 'pn=' + str(page)
                                    old_page = re.compile(regex).search(nextUrl).group()
                                    nextUrl = nextUrl.replace(old_page, next_page)
                                    yield scrapy.Request(url=nextUrl, meta={'page':page}, callback=self.parse)

                if response.meta.has_key('page') != True and self.sousu == 1:
                    flag = 1
                    for next_url in response.xpath('//div[@class="p-fy"]/a/@href').extract():
                        if next_url.startswith('http://') != True and next_url.startswith('https://') !=True:
                            nextUrl = self.start_urls[0] + '/weixin' + next_url
                            regex = 'page=(\d+)'
                            page_number = re.compile(regex).search(nextUrl).group(1)
                            if page_number and flag:
                                flag = 0
                                for page in range(2,3):
                                    next_page = 'page=' + str(page)
                                    old_page = re.compile(regex).search(nextUrl).group()
                                    nextUrl = nextUrl.replace(old_page, next_page)
                                    yield scrapy.Request(url=nextUrl, meta={'page':page}, callback=self.parse)
            # 支持贴吧抓取
            elif not self.params['redis_key'] and not self.sousu:
                for page in range(0, 200, 50):
                    pn = urlencode({'pn':page})
                    next_page = response.url + pn
                    yield scrapy.Request(next_page, callback=self.parse_tieba)
                    
            else:
                self.parse_url(response)
                print response.url
                a_list = response.xpath('//a/@href').extract()
                for linkstr in a_list:
                    if linkstr.startswith('http://') != True and linkstr.startswith('https://') !=True:
                        linkstr = response.urljoin(linkstr)
                    if 'about:blank' != linkstr and linkstr.lower().endswith('.rar') != True and linkstr.lower().endswith('.apk') != True and linkstr.startswith('tel') != True and linkstr.lower().endswith('.css') != True and linkstr.lower().endswith('.js') != True:
                        yield scrapy.Request(url=linkstr, meta={'url':response.url}, callback=self.parse)

    def parse_tieba(self, response):
        with open('aa', 'a') as f:
            f.write(response.url+'\n')
        regex = 'href="(.*?)"'
        a_list = re.compile(regex).findall(response.body)
        for url in a_list:
            if len(url) >= 5 and 'javascript' not in url and 'css' not in url and url.startswith('/p'):
                if url.startswith('http:') != True and url.startswith('https:') != True:
                    url = response.urljoin(url)
                yield scrapy.Request(url, meta={'url':response.url}, callback=self.parse_url)


    def parse_dupefilter(self, response):
        try:
            a_list = response.xpath("//a/@href").extract()
            data = soup_text(response)
            data = data + str(a_list)
        except Exception, e:
            data = str(e)
        print data
        if data:
            m = hashlib.md5()
            m.update(data)
            data_md5 = m.hexdigest()
            return data_md5
        else:
            return ''

    def parse_text(self, response):

        item = {}
        try:
            father_url = response.meta["url"]
        except:
            father_url = "''"
        try:
            item['title'] = response.xpath('//title/text()').extract_first().replace('\r\n','').replace('\n','')
        except:
            item['title'] = "''"
        item['url'] = response.url
        item['domain'] = ''
        item['crawl_time'] = time.strftime('%Y%m%d%H%M%S')
        item['keyword'] = ''
        item['Type_result'] = ''
        item['type'] = 'html'
        item['filename'] = 'yq_' + str(int(time.time())) + '_0' + str(rand5())+'.txt'
        item['referver'] = father_url
        item['like'] = ''
        item['transpond'] = ''
        item['comment'] = ''
        item['publish_time'] = ''
        return item

    def parse_url(self, response):
        # 以内容做指纹
        data_md5 = self.parse_dupefilter(response)
        if self.data_conn.sadd(self.datadupefilter, data_md5):
            content = soup_text(response.body)
            print content
            item = self.parse_text(response)
            self.data_conn.lpush(self.yuqing, item)
            yuqing_file = os.path.join(self.filepath, item['filename'])
            with open(yuqing_file, 'w') as b:
                b.write(content)

    def pang_bo(self, response):
        # 过略掉百度网页
        if 'baidu.com' not in response.url:
            item = self.parse_text(response)
            content = soup_text(response.body)
            if len(content) > 3000:
                content = content[:3000]
            body = item['url']+','+item['crawl_time']+','+item['title'].replace(',','') +','+content+'\n'
            filename = time.strftime('%Y%m%d%H')+'.csv'
            with open(filename, 'a') as f:
                f.write(body)

启动爬虫的时候我们会以 scrapy crawl spider -a="{}" 的形式,给他传递一个json字符串,在spider 的init中,我们继承了BaiduSpider原有的初始化,并重写我们需要的东西。在init方法中,我们提取到我们从外面传入的数据,并进行处理。连接数据库、引擎选择、跨域开关设置、redis_key配置等。
修改make_request_from_url这个函数,此函数会传递一个url参数给Request对象。在这个过程过,我们会根据传递的动静态开关,给Request对象添加一个meta字典。这样我们在middlewares中间件中,通过process_request方法,提取到该请求携带的参数,来启动selenium+chrome来进行抓取。process_request函数是一个类方法,它默认携带request和spider两个对象。我们可以通过request.meta来获取刚刚传递的数据,详情如下图:

image.png

接下来详细解说parse代码:

请查阅scrapy_redis抓取百度贴吧、微信文章、微信公众号、域名等通用式爬虫(二)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354