增量爬取和去重
增量爬取
当一个站点有数据更新的时候,需要进行增量爬取,通常有以下集中情况
某个特定页面数据更新
新增了页面
情况1的时候,我们对此特定页面的内容做哈希,当然要去除动态变化的那一部分,比如有的页面有验证码或者日期,程序定期执行,在执行的最开始检测此页面的哈希值跟上次抓取是否有变化,如果有变化就开始抓取。
情况2的时候,我们会对页面入口内容做哈希,并且存储分页面的URL哈希值,如果页面入口哈希值发生变化,获取新增的页面url列表,在这里需要用到url的去重,和数据去重类似,采用redis集合类型处理。
redis集合类型不允许添加重复的数据,当添加重复的时候时,返回0,并且添加失败。我们将所有的url list存入redis集合,当页面入口变化时,进行页面url去重,只抓取新增的页面。
爬取结果去重
结果去重常用的有两种方法:
- 布隆过滤器
- redis集合
布隆过滤器
其中布隆过滤器是通过写文件的方式,多个进程使用需要添加同步和互斥,较为繁琐,不推荐多线程/进程的时候使用,另外写文件是磁盘I/O操作,耗费时间长,可以累积到一定数量再一次写入,或者利用上下文管理器在程序结束或异常退出时一次性写入。
class Spider(object):
def __init():
# 布容过滤器初始化
self.burongname = 'test.bl'
if not os.path.isfile(self.burongname):
self.bl = BloomFilter(capacity=100000, error_rate=0.000001)
else:
with open(self.burongname, 'rb') as f:
self.bl = BloomFilter.fromfile(f)
def __enter__(self):
u"""
上下文管理器进入入口
"""
return self
def __exit__(self, *args):
u"""
上下文管理器,退出出口
"""
if self.conn is not None:
self.conn.close()
with open(self.burongname, 'wb') as f:
self.fingerprints.tofile(f)
def get_infos(self):
"""
抓取主函数
"""
# 布隆过滤器使用部分, x为抓取到得数据
x = json.dumps(i)
if x not in self.bl:
self.bl.add(x)
if __name__ == '__main__':
with Spider() as MSS:
MSS.get_infos()
上下文管理器,在主函数执行之前执行 def enter ,在程序运行结束或异常退出时执行def exit, 上下文管理器还可以用来统计程序执行的时间。
redis集合
使用redis集合去重能够支持多线程多进程.
利用redis集合无重复数据的特点,在redis建立集合,往其中添加数据的sha1值,添加成功返回1,表示无重复,添加失败返回0,表示集合中已经有重复数据
使用: 步骤:1. 建立redis连接池 2. 重复检查
下面的例子是接口,并提供example。
[Redis]
server=192.168.0.100
pass=123@123
# -*- coding:utf-8 -*-
"""
File Name : 'distinct'.py
Description:
Author: 'chengwei'
Date: '2016/6/2' '11:45'
python: 2.7.10
"""
import sys
import hashlib
import os
import codecs
import ConfigParser
import redis
reload(sys)
sys.setdefaultencoding('utf-8')
"""
利用redis的集合不允许添加重复元素来进行去重
"""
def example():
pool, r = redis_init()
temp_str = "aaaaaaaaa"
result = check_repeate(r, temp_str, 'test:test')
if result == 0:
# do what you want to do
print u"重复"
else:
# do what you want to do
print u"不重复"
redis_close(pool)
def redis_init(parasecname="Redis"):
"""
初始化redis
:param parasecname:
:return: redis连接池
"""
cur_script_dir = os.path.split(os.path.realpath(__file__))[0]
cfg_path = os.path.join(cur_script_dir, "db.conf")
cfg_reder = ConfigParser.ConfigParser()
secname = parasecname
cfg_reder.readfp(codecs.open(cfg_path, "r", "utf_8"))
redis_host = cfg_reder.get(secname, "server")
redis_pass = cfg_reder.get(secname, "pass")
# redis
pool = redis.ConnectionPool(host=redis_host, port=6379, db=0, password=redis_pass)
r = redis.Redis(connection_pool=pool)
return pool, r
def sha1(x):
sha1obj = hashlib.sha1()
sha1obj.update(x)
hash_value = sha1obj.hexdigest()
return hash_value
def check_repeate(r, check_str, set_name):
"""
向redis集合中添加元素,重复则返回0,不重复则添加成功,并返回1
:param r:redis连接
:param check_str:被添加的字符串
:param set_name:项目所使用的集合名称,建议如下格式:”projectname:task_remove_repeate“
:return:
"""
hash_value = sha1(check_str)
result = r.sadd(set_name, hash_value)
return result
def redis_close(pool):
"""
释放redis连接池
:param pool:
:return:
"""
pool.disconnect()
if __name__ == '__main__':
example()