1.首先创建一个获取代理ip的类,这里取名为ProxyPool。
class ProxyPool:
def get_soup(self, url):
pass
def get_youdaili(self):
pass
这个ProxyPool类中有两个方法:
- get_soup(self,url)
这个方法除了接受本身参数之外,还接受一个url参数(网址),返回一个美味的soup对象。
def get_soup(self, url):
resp = requests.get(url)
if resp.status_code == 200:
resp.encoding = "utf-8"
soup = BeautifulSoup(resp.text, "lxml")
return soup
- get_youdaili(self)
这个方法不用额外的参数,它使用方法本身里的数据,在这里是优代理网站的地址。经过对该方法的调用,网站中提供的ip会被逐个地添加到数据库中。
def get_youdaili(self):
soup = self.get_soup('http://www.youdaili.net/Daili/')
a_tag = soup.select('div.newslist_body > ul > li > a')
for i in a_tag:
url = i.get('href')
ip_re = re.compile(r'((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{2,5})@([a-zA-Z0-9]{4,7}))')
soup = self.get_soup(url)
ips = ip_re.findall(soup.text)
page_tag = soup.select('ul.pagelist > li > a')
if page_tag:
page = re.search(r'\d', page_tag[0].get_text()).group()
page = int(page)
else:
page = 1
if page >= 2: # 如果有第二页就继续爬取
for i in range(2, page + 1):
soup_sub = self.get_soup(url[:-5] + "_" + str(i) + ".html")
ips += ip_re.findall(soup_sub.text)
if ips:
for i in ips:
try:
proxy_pool.insert_one({
'ip_port': i[1],
'protocol': i[2].lower(),
'update_time': int(time.time())
})
except pymongo.errors.DuplicateKeyError as ex:
pass
- 数据库 proxy 以及数据库表单 proxy_pool 的建立
client = pymongo.MongoClient("localhost", 27017)
proxy = client['proxy']
proxy_pool = proxy['proxy_pool']
proxy_pool.ensure_index('ip_port', unique=True) # 如果有重复的ip 写进去 会报错
2.接着创建一个检测代理质量的类,这里取名为ProxyCheck。
class ProxyCheck:
ip_port_all = [(i['ip_port'], i['protocol']) for i in proxy_pool.find()] # 查询,获取所有ip
def remove_ip(self, ip_port):
pass
def get_status(self, ip_port, protocol):
pass
def check(self):
pass
这个类中有三个方法:
- remove_ip(self, ip_port)
这个方法需要一个ip_port参数(如1.255.53.81:80)),查询数据库后,若找到含有该参数的pymongo对象,他会给该对象添加一个为None的speed属性,随后判断该对象的更新时间是否大于一周,并删除大于一周的对象。
def remove_ip(self, ip_port): # 如果没能成功响应,将执行次方法,将其响应速度设置为空并且判断存在时间是否超过一周
ip_data = proxy_pool.find({'ip_port': ip_port}) # <pymongo.cursor.Cursor object at 0x042D8FF0>
proxy_pool.update_one({'ip_port': ip_port}, {'$set': {'speed': None}})
if int(time.time()) - ip_data[0]['update_time'] > 604800: # time.time()是指1970纪元后经过的浮点秒数
proxy_pool.remove({'ip_port': ip_port})
- get_status(self, ip_port, protocol)
该函数需要接受2个参数,如('1.255.53.81:80', 'http'),并试图用该代理参数去访问一个正常的页面,若成功便将反应时间更新到speed属性,同时更新update属性,否者就调用remove_ip()方法来删除该对象。
def get_status(self, ip_port, protocol):
url = "http://fz.58.com/"
proxies = {"http": protocol + "://" + ip_port}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
}
time1 = time.clock() # 以浮点数计算的秒数返回当前的CPU时间,用来衡量不同程序的耗时
try: # 使用代理常常容易出错
resp = requests.get(url, headers=headers, proxies=proxies, timeout=6)
except Exception as ex:
print(ex)
return self.remove_ip(ip_port)
time2 = time.clock()
time_result = time2 - time1 # 计算响应时间
if resp.status_code == 200:
print(ip_port)
proxy_pool.update_one({"ip_port": ip_port},
{'$set': {'speed': time_result, 'update_time': int(time.time())}})
else:
self.remove_ip(ip_port)
- check()
开启多线程进行检测
def check(self):
pool = Pool(20)
for i in self.ip_port_all:
if i[1] == 'http':
pool.apply_async(self.get_status, args=i)
pool.close()
pool.join()
3.if name=='main'启动部分:
if __name__ == "__main__":
if len(sys.argv) > 1: # 接收第一个参数,第一个参数为脚本运行的间隔时间
time_sleep = int(sys.argv[1])
else:
time_sleep = 60 * 60
while (True):
pp = ProxyPool()
pp.get_youdaili()
pc = ProxyCheck()
pc.check()
time.sleep(time_sleep)
全篇代码如下:
# coding:utf-8
# 因为网络上的代理毕竟是有限的,所以希望大家不要滥用
import re
import requests
import time
import pymongo
import sys
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool
client = pymongo.MongoClient("localhost", 27017)
proxy = client['proxy']
proxy_pool = proxy['proxy_pool']
proxy_pool.ensure_index('ip_port', unique=True) # 如果有重复的ip 写进去 会报错
class ProxyPool: # 获取代理ip的类
def get_soup(self, url):
resp = requests.get(url)
if resp.status_code == 200:
resp.encoding = "utf-8"
soup = BeautifulSoup(resp.text, "lxml")
return soup
def get_youdaili(self):
soup = self.get_soup("http://www.youdaili.net/Daili/")
a_tag = soup.select("div.newslist_body > ul > li > a")
for i in a_tag:
url = i.get('href')
ip_re = re.compile(r'((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{2,5})@([a-zA-Z0-9]{4,7}))')
soup = self.get_soup(url)
ips = ip_re.findall(soup.text)
page_tag = soup.select("ul.pagelist > li > a") # 是否还有第二页
if page_tag:
page = re.search(r"\d", page_tag[0].get_text()).group()
page = int(page)
else:
page = 1
if page >= 2: # 如果有第二页就继续爬取
for i in range(2, page + 1):
soup_sub = self.get_soup(url[:-5] + "_" + str(i) + ".html")
ips += ip_re.findall(soup_sub.text)
if ips:
for i in ips:
try: # 数据库不允许插入相同的ip,如果有相同的,这里将会报错,所以加个try
proxy_pool.insert_one({
'ip_port': i[1],
'protocol': i[2].lower(), # 协议
'update_time': int(time.time()) # 抓取时的时间
})
except pymongo.errors.DuplicateKeyError as ex:
pass
print(url)
class ProxyCheck:
ip_port_all = [(i['ip_port'], i['protocol']) for i in proxy_pool.find()] # 查询,获取所有ip
def remove_ip(self, ip_port): # 如果没能成功响应,将执行次方法,将其响应速度设置为空并且判断存在时间是否超过一周
ip_data = proxy_pool.find({'ip_port': ip_port})
proxy_pool.update_one({'ip_port': ip_port}, {'$set': {'speed': None}})
if int(time.time()) - ip_data[0]['update_time'] > 604800:
proxy_pool.remove({'ip_port': ip_port})
def get_status(self, ip_port, protocol):
url = "http://fz.58.com/"
proxies = {"http": protocol + "://" + ip_port}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
}
time1 = time.clock()
try: # 使用代理常常容易出错
resp = requests.get(url, headers=headers, proxies=proxies, timeout=6)
except Exception as ex:
print(ex)
return self.remove_ip(ip_port)
time2 = time.clock()
time_result = time2 - time1 # 计算响应时间
if resp.status_code == 200:
print(ip_port)
proxy_pool.update_one({"ip_port": ip_port},
{'$set': {'speed': time_result, 'update_time': int(time.time())}})
else:
self.remove_ip(ip_port)
def check(self): # 开启多线程进行检测
pool = Pool(20)
for i in self.ip_port_all:
if i[1] == 'http':
pool.apply_async(self.get_status, args=i)
pool.close()
pool.join()
if __name__ == "__main__":
if len(sys.argv) > 1: # 接收第一个参数,第一个参数为脚本运行的间隔时间
time_sleep = int(sys.argv[1])
else:
time_sleep = 60 * 60
while (True):
pp = ProxyPool()
pp.get_youdaili()
pc = ProxyCheck()
pc.check()
time.sleep(time_sleep)