通过上篇文章,可以了解到代理池实现的具体思路,但公司业务需要,项目中要使用到性能更稳定的代理
推荐使用自己搭的服务或购买收费代理
根据需求,修改上篇文章的代理池实现即可满足需求,这里以芝麻代理为例
业务分析
最核心的部分当然修改获取免费代理的方法啦,在get_proxy的类ProxyGetter
中把所有的以proxy_开头的免费代理获取方法注释掉,添加芝麻接口的proxy_方法即可
当然芝麻的代理机制和免费代理机制也是不同的,具体表现为:
免费代理:量大,可用代理少
付费代理:量少,几乎获取到的代理都可用
业务上需要,个人使用的版本是芝麻长效代理,每个代理有效时间为25分钟到3小时,每天200个,在获取代理的方法中调用芝麻接口获取代理,每次只需要少量的几个(因为芝麻的代理质量比较好,几乎获取到的都是可用代理)
通过接口拿到代理之后做可用性检测,然后入库,因为每天可获取的代理量只有两百个,要保证24小时mongo中都有可用的代理,设计代理池的阈值范围为3~5个,保证每时每刻代理池都有3到5个代理
代理量分配计算:
理想情况下平均每个代理持续时间大概为(25+180)/ 2 = 102.5 分钟
200个代理分布到24小时: 200*102.5/24*60 =14.236...
理论上讲可以保证池中最大阈值为14个,保险起见把阈值调的更小来提高代理服务的稳定性,代理池容量要设置更小一些,因为付费的代理服务或多或少都有可能会有异常状况出现
这个是芝麻代理返回json的数据调用接口:
http://webapi.http.zhimacangku.com/getip?num=2&type=2&pro=&city=0&yys=0&port=1&pack=***&ts=1&ys=0&cs=0&lb=1&sb=0&pb=45&mr=1®ions=
返回的json格式:
{"code":0,"success":true,"msg":"","data":[{"ip":"127.0.0.1", "port":123456}]}
那么获取代理的方法就很简单了:
def proxy_zhima(self):
url = 'http://webapi.http.zhimacangku.com/getip?num=2&type=2&pro=&city=0&yys=0' \
'&port=1&pack=***&ts=1&ys=0&cs=0&lb=1&sb=0&pb=45&mr=1®ions='
resp = parse_url(url)
html = json.loads(resp)
code = html.get('code')
success = html.get('success')
if code != 0 or success == 'false':
print(html)
return
datas = html.get('data')
for data in datas:
yield data.get('ip') + ':' + str(data.get('port'))
https检测
原方案实现了http的代理检测,而我们有时会用到https的代理
这里就要在检测模块增加https的检测方法、并在数据入库的时候标识此次入库的代理为http/https
aiohttp检测https代理的方式和检测http代理方式相同,只需要切换检测url为https的即可
逻辑如下:
首先获取代理并默认检测代理是否是https,检测失败则再次检测http
这样取代理的时候默认取https代理(因为https代理所有的http协议都可以用),至于非https默认只入库,特殊情况才使用
# tester.py -> class ProxyTester
async def test_single_proxy(self, proxy):
"""
测试一个代理,如果有效,即入库
"""
scheme = 'http://'
test_url = HTTPS_TEST_URL
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
real_proxy = scheme + proxy
async def test_proxy(https=True):
name = 'https' if https else 'http'
async with session.get(test_url, proxy=real_proxy, timeout=10) as response:
if response.status == 200 or response.status == 429:
self._conn.put(proxy, https)
print('Valid {} proxy'.format(name), proxy)
else:
print('Invalid {} status'.format(name), response.status, proxy)
self._conn.delete(proxy)
try:
async with aiohttp.ClientSession() as session:
try:
await test_proxy()
except:
try:
test_url = HTTP_TEST_URL
await test_proxy(False)
except Exception as e:
self._conn.delete(proxy)
print('Invalid proxy', proxy)
print('session error', e)
except Exception as e:
print(e)
注意方法test_single_proxy内部还嵌套了test_proxy方法,用于检测业务
内层嵌套函数默认可以获取到外层的上下文(环境变量)
检测成功即入库,注意调用方法self._conn.put(proxy, https)
调用数据库实例MongodbClient.put的方法,此时我已经修改put方法的实现,需要传入两个参数,https参数用来标识此次入的的代理类型
此时我们关注一下mongo的api具体实现:
class MongodbClient(object):
def __init__(self, table=TABLE):
self.table = table
self.client = MongoClient(HOST, PORT)
self.db = self.client[NAME]
def change_table(self, table):
self.table = table
def proxy_num(self):
"""
得到数据库中代理num最高的数
"""
if self.get_nums != 0:
self.sort()
datas = [i for i in self.db[self.table].find()]
nums = []
for data in datas:
nums.append(data['num'])
return max(nums)
else:
return 0
def get(self, count):
"""
从数据库左侧拿到相应数量的代理
"""
if self.get_nums != 0:
self.sort()
datas = [i for i in self.db[self.table].find()][0:count]
proxies = []
for data in datas:
proxies.append(data['proxy'])
# self.delete(data['proxy'])
return proxies
return None
def put(self, proxy, https=False):
"""
放置代理到数据库
"""
num = self.proxy_num() + 1
if self.db[self.table].find_one({'proxy': proxy}):
pass
else:
self.db[self.table].insert({'proxy': proxy, 'num': num, 'http/s': https})
# self.db[self.table].insert({'proxy': proxy, 'num': num})
def pop(self, https=False):
"""
从数据库右侧拿到一个代理
"""
if self.get_nums != 0:
self.sort()
data = random.choice([i for i in self.db[self.table].find({'http/s': https})])
# data = [i for i in self.db[self.table].find({'http/s': https})][-1]
proxy = data['proxy'] if data != None else None
# 取出来使用后就从池中移除
# self.delete(proxy)
# 改变策略保留ip
return proxy
return None
def delete(self, value):
"""
如果代理没有通过检查,就删除
"""
self.db[self.table].remove({'proxy': value})
def sort(self):
"""
按num键的大小升序
"""
self.db[self.table].find().sort('num', ASCENDING)
def clean(self):
"""
清空数据库
"""
self.client.drop_database('proxy')
@property
def get_nums(self):
"""
得到数据库代理总数
"""
return self.db[self.table].count()
@property
def get_count(self):
# 分别统计http/s的代理总数
http = self.db[self.table].find({'http/s': False}).count()
https = self.db[self.table].find({'http/s': True}).count()
return http, https
其中put方法入库的实现:
self.db[self.table].insert({'proxy': proxy, 'num': num, 'http/s': https})
可以看到插入的mongo文档添加了一个'http/s'字段用来标识代理的类型
get_count方法会分别返回两种代理类型的数量
元类
博主之前的文章有介绍过元类,熟悉了就不难发现这个代理池实现的元类使用稍微有一点冗余部分
此前的元类中实现在类中添加两个属性,代理方法数量、代理方法名
其中代理方法名是一个列表类型,有了列表我们就可以遍历列表了,所以此时元类中只需要一个属性即可,代理方法的数量是多余的:
class ProxyMetaclass(type):
"""
元类,在ProxyGetter类中加入
__CrawlFunc__参数
表示爬虫函数
"""
def __new__(cls, name, bases, attrs):
attrs['__CrawlFunc__'] = []
for k in attrs.keys():
if k.startswith('proxy_'):
attrs['__CrawlFunc__'].append(k)
return super(ProxyMetaclass, cls).__new__(cls, name, bases, attrs)
此时注意,原先是返回type.__new__(cls, name, bases, attrs)
我们修改为更推荐的创建元类方式:super(ProxyMetaclass, cls).__new__(cls, name, bases, attrs)
添加代理的方法也要修改:
# adder.py -> class PoolAdder
def add_to_pool(self):
"""
补充代理
"""
print('PoolAdder is working...')
proxy_count = 0
while not self.is_over_threshold():
# 迭代所有的爬虫
# __CrawlFunc__是爬虫方法
for callback in self._crawler.__CrawlFunc__:
raw_proxies = self._crawler.get_raw_proxies(callback)
# 测试爬取到的代理
self._tester.set_raw_proxies(raw_proxies)
self._tester.test()
proxy_count += len(raw_proxies)
if self.is_over_threshold():
print('Proxy is enough, waiting to be used...')
break
这样修改下来,代码量、业务逻辑相对之前会简洁一些,要善用元类
配置文件部分
配置部分做一些微调,添加https的检测url
修改调度周期
# 供测试的url
HTTP_TEST_URL = 'http://mini.eastday.com/assets/v1/js/search_word.js'
HTTPS_TEST_URL = 'https://mp.weixin.qq.com/mp/getappmsgext'
# Pool 的低阈值和高阈值
POOL_LOWER_THRESHOLD = 3
POOL_UPPER_THRESHOLD = 5
# 两个调度进程的周期
VALID_CHECK_CYCLE = 3
POOL_LEN_CHECK_CYCLE = 5
其中每隔3s检测池中代理的有效性,每隔5s检测代理池容量大小是否在阈值范围(3~5)
大体就实现定制自己的代理池服务了>_<