简单分布式爬虫——第二弹:masterSpider的实现

上一讲简单分布式爬虫——第一弹:了解分布式爬虫结构
我们讲过,masterSpider的作用是协调各节点spider之间的工作,包括任务分发、URL管理、结果回收等,那么这一讲,我们就来逐步实现masterSpider的各项功能。Ready,go!

首先,由于我们打造的是分布式爬虫,所以需要在多台主机之间进行通信,在这里我们通过python的multiprocessing模块来实现,该模块中有一个managers的子模块支持把多进程分布到多台机器上,主从机之间通过任务队列进行联系,为了实现爬虫功能,
1、我们需要设置几个队列,分别是:任务队列task_queue、结果队列result_queue、结果处理队列conn_queue、数据存储队列store_queue。
2、将上面创建的队列(主要是任务队列和结果队列)在网络上注册,以便其他主机能够在网络上发现他们,注册后获得网络队列,相当于本地队列的映像。
3、创建一个Basemanager的实例manager,绑定端口和验证口令。
4、启动manager实例,监管信息通道。
5、通过管理实例的方法获得网络访问的Queue对象,即把网络队列实体化为可用的本地队列。
5、创建任务到“本地队列”,自动上传至网络队列,分配给网络上的其他主机进行处理。
下面实现这一过程:

# -*- coding: utf-8 -*-

def start_Manager(self, task_queue, result_queue):
        '''
        创建分布式爬虫管理器
        :param task_queue:   任务队列
        :param result_queue: result队列
        :return:             manager对象
        '''
        def _get_task():
            return task_queue
        def _get_result():
            return result_queue
        # 在网络上注册两个管理队列
        BaseManager.register('get_task_queue', callable=_get_task)
        BaseManager.register('get_result_queue', callable=_get_result)
        # 监听主机 端口,设置身份认证口令
        manager = BaseManager(address=('127.0.0.1', 10001), authkey='python'.encode('utf-8'))
        return manager

以上实现了分布式爬虫的控制管理器,接下来实现URL管理器,还是先明确一下URL管理器的任务:
URL管理器首先需要两个集合(使用集合是为了利用集合元素不重复的特性实现url自动去重):待爬取url集合和已爬取url集合。URL管理器需要完成添加新url到未爬取url集合、添加爬过的url到已爬取url集合、判断是否有未爬取url、取一个未爬取url、存储爬虫进度等。下面来实现:为方便管理,创建URL管理器类:

## UrlManager.py
# -*- coding: utf-8 -*-
try:
    import cPickle as pickle
except:
    import pickle
import hashlib
class UrlManager(object):
    '''
    url管理器:
        两个url集合(集合可以自动去重):
            未爬取url: new_urls
            已爬取url: old_urls
        七个接口:
            判断是否有待爬取url:    has_new_url()
            添加新url到集合:        add_new_url(url)、add_new_urls(urls)
            取出一个url:            get_new_url()
            获取未爬取url集合大小:  new_url_size()
            获取已爬取url集合大小:  old_url_size()
            进度保存:               save_procese(path, data)
            进度加载:               load_process(path)
    '''
    def __init__(self):
        self.new_urls = self.load_process('new_urls.txt')  # 未爬取URL集合
        self.old_urls = self.load_process('old_urls.txt')  # 已爬取URL集合
    def has_new_url(self):
        '''
        判断是否有待爬取的url
        :return: True or False
        '''
        return self.new_url_size() != 0
    def get_new_url(self):
        '''
        获取一个未爬取的url
        :return: 一个未爬取的url
        '''
        new_url = self.new_urls.pop()
        m = hashlib.md5()
        m.update(new_url.encode('utf-8'))
        # 为了减少存储量,存储已爬取的url的md5而不是url
        self.old_urls.add(m.hexdigest())
        return new_url
    def add_new_url(self, url):
        '''
        添加一个新url到未爬取集合
        :param url: 单个url
        :return:    无
        '''
        if url is None:
            return
        m = hashlib.md5()
        m.update(url.encode('utf-8'))
        url_md5 = m.hexdigest()
        if url not in self.new_urls and url_md5 not in self.old_urls:
            self.new_urls.add(url)
    def add_old_url(self, url):
        if url is None:
            return
        m = hashlib.md5()
        m.update(url.encode('utf-8'))
        url_md5 = m.hexdigest()
        if url_md5 not in self.old_urls:
            self.old_urls.add(url)
    def add_new_urls(self, urls):
        '''
        添加多个新url到未爬取的url集合
        :param urls:多个url
        :return:    无
        '''
        if urls is None or len(urls)==0:
            return
        for url in urls:
            self.add_new_url(url)
    def add_old_urls(self, urls):
        '''
        添加多个新url到未爬取的url集合
        :param urls:多个url
        :return:    无
        '''
        if urls is None or len(urls)==0:
            return
        for url in urls:
            self.add_old_url(url)
    def new_url_size(self):
        '''
        获取未爬取url集合大小
        :return: 未爬取url集合大小
        '''
        return len(self.new_urls)
    def old_url_size(self):
        '''
        获取已爬取url集合大小
        :return: 已爬取url集合大小
        '''
        return len(self.old_urls)
    def save_process(self, path, data):
        '''
        进度保存
        :param path:    保存文件路径
        :param data:    数据
        :return:
        '''
        print('[!]保存进度:%s' % path)
        with open(path, 'wb') as f:
            pickle.dump(data, f)    # 使用数据持久存储模块pickle
    def load_process(self, path):
        '''
        从本地文件加载爬虫进度
        :param path:    本地文件路径
        :return:
        '''
        print('[+]从文件加载进度:%s' % path)
        try:
            with open(path, 'rb') as f:
                data = pickle.load(f)
                return data
        except:
            print('[!]无进度文件,创建:%s' % path)
        return set()

完成上面两步之后,接下来就重点实现masterSpider的任务了:
1、url管理任务
2、结果分析
3、数据存储
每项任务我们单独开一个进程来负责。

# url管理任务
    def url_manager_process(self, task_queue, conn_queue, root_url):
        '''
        url管理器进程
        :param task_queue:   task队列
        :param conn_queue:   结果处理队列
        :param root_url:     起始url
        :return:
        '''
        url_manager = UrlManager()
        url_manager.add_new_url(root_url)
        while True:
            while url_manager.has_new_url():
                # 从url管理器获取新的url
                new_url = url_manager.get_new_url()
                # 将url分发下去
                url_queue.put(new_url)
                # 添加爬虫结束条件
                if url_manager.old_url_size() > 1000:
                    # 通知节点停止工作
                    url_queue.put('end')
                    print('控制节点发起停止命令')
                    # 关闭管理节点,存储爬虫状态
                    url_manager.save_process('new_urls.txt', url_manager.new_urls)
                    url_manager.save_process('old_urls.txt', url_manager.old_urls)
                    return

            # 添加新url到url管理器
            try:
                if not conn_queue.empty():
                    urls = conn_queue.get()
                    url_manager.add_new_urls(urls)
            except BaseException as e:
                # 休息一会儿
                time.sleep(0.1)

# 结果分析任务
    def result_process(self, result_queue, conn_queue, store_queue):
        '''
        # 结果分析进程
        :param result_queue:
        :param conn_queue:
        :param store_queue:
        :return:
        '''
        while True:
            try:
                if not result_queue.empty():
                    content = result_queue.get(True)
                    if content['new_urls'] == 'end':
                        print('结果分析进程收到结束命令')
                        store_queue.put('end')
                        return

                    conn_queue.put(content['new_urls'])
                    store_queue.put(content['data'])
                else:
                    # 休息一会儿
                    time.sleep(0.1)
            except BaseException as e:
                # 休息一会儿
                time.sleep(0.1)

# 数据存储任务
    def store_process(self, store_queue):
        '''
        # 结果存储进程
        :param store_queue:
        :return:
        '''
        output = DataOutPut()
        while True:
            if not store_queue.empty():
                data = store_queue.get()
                if data == 'end':
                    print('存储进程接到结束命令')
                    output.output_end(output.filepath)
                    return
                output.store_data(data)
            else:
                # 休息一会儿
                time.sleep(0.1)

为方便管理,我们可以将以上任务包括start_manager方法一起做成一个Manager管理类,于是就有:

## SpiderManager.py
# -*- coding: utf-8 -*-
from multiprocessing.managers import BaseManager
from multiprocessing import Process, Queue
from DataOutPut import DataOutPut
from UrlManager import UrlManager
import time

class SpiderManager(object):
    def start_Manager(self, task_queue, result_queue):
        pass #代码省略

    def url_manager_process(self, task_queue, conn_queue, root_url):
        pass #代码省略
    
    def result_process(self, result_queue, conn_queue, store_queue):
        pass #代码省略

    def store_process(self, store_queue):
        pass #代码省略

if __name__ == '__main__':
    # 初始化队列
    task_queue = Queue()
    result_queue = Queue()
    store_queue = Queue()
    conn_queue = Queue()

    # 创建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(task_queue, result_queue)
    # 创建url管理进程、数据提取进程、数据存储进程
    url_manager_process = Process(target=node.url_manager_process, args=(task_queue, conn_queue, 'http://xxx',))
    result_process = Process(target=node.result_process, args=(result_queue, conn_queue, store_queue,))
    store_process = Process(target=node.store_process, args=(store_queue,))
    # 启动各个进程和分布式管理器
    url_manager_process.start()
    result_process.start()
    store_process.start()
    manager.get_server().serve_forever()

最后就是数据存储,可以存储到本地文本文件或者数据库中,这里将数据以.html格式进行存储:

## DataOutPut.py
# -*- coding: utf-8 -*-
import codecs
import time

class DataOutPut(object):
    '''
    数据存储器:
        内存存储: store_data(data)
        文件存储: output_data(path)
    '''

    def __init__(self):
        self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()))
        self.output_head(self.filepath)
        self.datas = []

    def store_data(self, data):
        if data is None:
            return
        self.datas.append(data)
        if len(self.datas)>10:
            self.output_html(self.filepath)

    def output_head(self, path):
        '''
        将HTML头写进去
        :return:
        '''
        fout=codecs.open(path,'w',encoding='utf-8')
        fout.write("<html>")
        fout.write("<body>")
        fout.write("<table>")
        fout.close()

    def output_html(self,path):
        '''
        将数据写入HTML文件中
        :param path: 文件路径
        :return:
        '''
        fout=codecs.open(path,'a',encoding='utf-8')
        for data in self.datas:
            fout.write("<tr>")
            fout.write("<td>%s</td>"%data['url'])
            fout.write("<td>%s</td>"%data['title'])
            fout.write("<td>%s</td>"%data['summary'])
            fout.write("</tr>")
        self.datas=[]
        fout.close()

    def output_end(self,path):
        '''
        输出HTML结束
        :param path: 文件存储路径
        :return:
        '''
        fout=codecs.open(path,'a',encoding='utf-8')
        fout.write("</table>")
        fout.write("</body>")
        fout.write("</html>")
        fout.close()

以上就是分布式爬虫——第二弹:masterSpider的实现的全部内容。
下一讲:分布式节点爬虫的实现

参考资料:《Python爬虫开发与项目实战》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352