上一讲简单分布式爬虫——第一弹:了解分布式爬虫结构
我们讲过,masterSpider的作用是协调各节点spider之间的工作,包括任务分发、URL管理、结果回收等,那么这一讲,我们就来逐步实现masterSpider的各项功能。Ready,go!
首先,由于我们打造的是分布式爬虫,所以需要在多台主机之间进行通信,在这里我们通过python的multiprocessing模块来实现,该模块中有一个managers的子模块支持把多进程分布到多台机器上,主从机之间通过任务队列进行联系,为了实现爬虫功能,
1、我们需要设置几个队列,分别是:任务队列task_queue、结果队列result_queue、结果处理队列conn_queue、数据存储队列store_queue。
2、将上面创建的队列(主要是任务队列和结果队列)在网络上注册,以便其他主机能够在网络上发现他们,注册后获得网络队列,相当于本地队列的映像。
3、创建一个Basemanager的实例manager,绑定端口和验证口令。
4、启动manager实例,监管信息通道。
5、通过管理实例的方法获得网络访问的Queue对象,即把网络队列实体化为可用的本地队列。
5、创建任务到“本地队列”,自动上传至网络队列,分配给网络上的其他主机进行处理。
下面实现这一过程:
# -*- coding: utf-8 -*-
def start_Manager(self, task_queue, result_queue):
'''
创建分布式爬虫管理器
:param task_queue: 任务队列
:param result_queue: result队列
:return: manager对象
'''
def _get_task():
return task_queue
def _get_result():
return result_queue
# 在网络上注册两个管理队列
BaseManager.register('get_task_queue', callable=_get_task)
BaseManager.register('get_result_queue', callable=_get_result)
# 监听主机 端口,设置身份认证口令
manager = BaseManager(address=('127.0.0.1', 10001), authkey='python'.encode('utf-8'))
return manager
以上实现了分布式爬虫的控制管理器,接下来实现URL管理器,还是先明确一下URL管理器的任务:
URL管理器首先需要两个集合(使用集合是为了利用集合元素不重复的特性实现url自动去重):待爬取url集合和已爬取url集合。URL管理器需要完成添加新url到未爬取url集合、添加爬过的url到已爬取url集合、判断是否有未爬取url、取一个未爬取url、存储爬虫进度等。下面来实现:为方便管理,创建URL管理器类:
## UrlManager.py
# -*- coding: utf-8 -*-
try:
import cPickle as pickle
except:
import pickle
import hashlib
class UrlManager(object):
'''
url管理器:
两个url集合(集合可以自动去重):
未爬取url: new_urls
已爬取url: old_urls
七个接口:
判断是否有待爬取url: has_new_url()
添加新url到集合: add_new_url(url)、add_new_urls(urls)
取出一个url: get_new_url()
获取未爬取url集合大小: new_url_size()
获取已爬取url集合大小: old_url_size()
进度保存: save_procese(path, data)
进度加载: load_process(path)
'''
def __init__(self):
self.new_urls = self.load_process('new_urls.txt') # 未爬取URL集合
self.old_urls = self.load_process('old_urls.txt') # 已爬取URL集合
def has_new_url(self):
'''
判断是否有待爬取的url
:return: True or False
'''
return self.new_url_size() != 0
def get_new_url(self):
'''
获取一个未爬取的url
:return: 一个未爬取的url
'''
new_url = self.new_urls.pop()
m = hashlib.md5()
m.update(new_url.encode('utf-8'))
# 为了减少存储量,存储已爬取的url的md5而不是url
self.old_urls.add(m.hexdigest())
return new_url
def add_new_url(self, url):
'''
添加一个新url到未爬取集合
:param url: 单个url
:return: 无
'''
if url is None:
return
m = hashlib.md5()
m.update(url.encode('utf-8'))
url_md5 = m.hexdigest()
if url not in self.new_urls and url_md5 not in self.old_urls:
self.new_urls.add(url)
def add_old_url(self, url):
if url is None:
return
m = hashlib.md5()
m.update(url.encode('utf-8'))
url_md5 = m.hexdigest()
if url_md5 not in self.old_urls:
self.old_urls.add(url)
def add_new_urls(self, urls):
'''
添加多个新url到未爬取的url集合
:param urls:多个url
:return: 无
'''
if urls is None or len(urls)==0:
return
for url in urls:
self.add_new_url(url)
def add_old_urls(self, urls):
'''
添加多个新url到未爬取的url集合
:param urls:多个url
:return: 无
'''
if urls is None or len(urls)==0:
return
for url in urls:
self.add_old_url(url)
def new_url_size(self):
'''
获取未爬取url集合大小
:return: 未爬取url集合大小
'''
return len(self.new_urls)
def old_url_size(self):
'''
获取已爬取url集合大小
:return: 已爬取url集合大小
'''
return len(self.old_urls)
def save_process(self, path, data):
'''
进度保存
:param path: 保存文件路径
:param data: 数据
:return:
'''
print('[!]保存进度:%s' % path)
with open(path, 'wb') as f:
pickle.dump(data, f) # 使用数据持久存储模块pickle
def load_process(self, path):
'''
从本地文件加载爬虫进度
:param path: 本地文件路径
:return:
'''
print('[+]从文件加载进度:%s' % path)
try:
with open(path, 'rb') as f:
data = pickle.load(f)
return data
except:
print('[!]无进度文件,创建:%s' % path)
return set()
完成上面两步之后,接下来就重点实现masterSpider的任务了:
1、url管理任务
2、结果分析
3、数据存储
每项任务我们单独开一个进程来负责。
# url管理任务
def url_manager_process(self, task_queue, conn_queue, root_url):
'''
url管理器进程
:param task_queue: task队列
:param conn_queue: 结果处理队列
:param root_url: 起始url
:return:
'''
url_manager = UrlManager()
url_manager.add_new_url(root_url)
while True:
while url_manager.has_new_url():
# 从url管理器获取新的url
new_url = url_manager.get_new_url()
# 将url分发下去
url_queue.put(new_url)
# 添加爬虫结束条件
if url_manager.old_url_size() > 1000:
# 通知节点停止工作
url_queue.put('end')
print('控制节点发起停止命令')
# 关闭管理节点,存储爬虫状态
url_manager.save_process('new_urls.txt', url_manager.new_urls)
url_manager.save_process('old_urls.txt', url_manager.old_urls)
return
# 添加新url到url管理器
try:
if not conn_queue.empty():
urls = conn_queue.get()
url_manager.add_new_urls(urls)
except BaseException as e:
# 休息一会儿
time.sleep(0.1)
# 结果分析任务
def result_process(self, result_queue, conn_queue, store_queue):
'''
# 结果分析进程
:param result_queue:
:param conn_queue:
:param store_queue:
:return:
'''
while True:
try:
if not result_queue.empty():
content = result_queue.get(True)
if content['new_urls'] == 'end':
print('结果分析进程收到结束命令')
store_queue.put('end')
return
conn_queue.put(content['new_urls'])
store_queue.put(content['data'])
else:
# 休息一会儿
time.sleep(0.1)
except BaseException as e:
# 休息一会儿
time.sleep(0.1)
# 数据存储任务
def store_process(self, store_queue):
'''
# 结果存储进程
:param store_queue:
:return:
'''
output = DataOutPut()
while True:
if not store_queue.empty():
data = store_queue.get()
if data == 'end':
print('存储进程接到结束命令')
output.output_end(output.filepath)
return
output.store_data(data)
else:
# 休息一会儿
time.sleep(0.1)
为方便管理,我们可以将以上任务包括start_manager方法一起做成一个Manager管理类,于是就有:
## SpiderManager.py
# -*- coding: utf-8 -*-
from multiprocessing.managers import BaseManager
from multiprocessing import Process, Queue
from DataOutPut import DataOutPut
from UrlManager import UrlManager
import time
class SpiderManager(object):
def start_Manager(self, task_queue, result_queue):
pass #代码省略
def url_manager_process(self, task_queue, conn_queue, root_url):
pass #代码省略
def result_process(self, result_queue, conn_queue, store_queue):
pass #代码省略
def store_process(self, store_queue):
pass #代码省略
if __name__ == '__main__':
# 初始化队列
task_queue = Queue()
result_queue = Queue()
store_queue = Queue()
conn_queue = Queue()
# 创建分布式管理器
node = NodeManager()
manager = node.start_Manager(task_queue, result_queue)
# 创建url管理进程、数据提取进程、数据存储进程
url_manager_process = Process(target=node.url_manager_process, args=(task_queue, conn_queue, 'http://xxx',))
result_process = Process(target=node.result_process, args=(result_queue, conn_queue, store_queue,))
store_process = Process(target=node.store_process, args=(store_queue,))
# 启动各个进程和分布式管理器
url_manager_process.start()
result_process.start()
store_process.start()
manager.get_server().serve_forever()
最后就是数据存储,可以存储到本地文本文件或者数据库中,这里将数据以.html格式进行存储:
## DataOutPut.py
# -*- coding: utf-8 -*-
import codecs
import time
class DataOutPut(object):
'''
数据存储器:
内存存储: store_data(data)
文件存储: output_data(path)
'''
def __init__(self):
self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()))
self.output_head(self.filepath)
self.datas = []
def store_data(self, data):
if data is None:
return
self.datas.append(data)
if len(self.datas)>10:
self.output_html(self.filepath)
def output_head(self, path):
'''
将HTML头写进去
:return:
'''
fout=codecs.open(path,'w',encoding='utf-8')
fout.write("<html>")
fout.write("<body>")
fout.write("<table>")
fout.close()
def output_html(self,path):
'''
将数据写入HTML文件中
:param path: 文件路径
:return:
'''
fout=codecs.open(path,'a',encoding='utf-8')
for data in self.datas:
fout.write("<tr>")
fout.write("<td>%s</td>"%data['url'])
fout.write("<td>%s</td>"%data['title'])
fout.write("<td>%s</td>"%data['summary'])
fout.write("</tr>")
self.datas=[]
fout.close()
def output_end(self,path):
'''
输出HTML结束
:param path: 文件存储路径
:return:
'''
fout=codecs.open(path,'a',encoding='utf-8')
fout.write("</table>")
fout.write("</body>")
fout.write("</html>")
fout.close()
以上就是分布式爬虫——第二弹:masterSpider的实现的全部内容。
下一讲:分布式节点爬虫的实现
参考资料:《Python爬虫开发与项目实战》