咱们平时上网爬数据,最头疼的就是IP被封。单台机器猛刷,网站一眼就能识破。想把活儿干得又快又稳,就得把任务拆开,让多台机器或多个进程一起干,每个还用不同的IP出口——这就好比让一群人轮流换装去排队,既减轻压力又降低风险。
下面是一个基于Python的分布式爬虫实现,使用多进程和代理隧道技术来分散请求压力并降低IP被封风险。
importmultiprocessing
importrequests
fromurllib.parseimporturlparse
importtime
importrandom
fromqueueimportQueue
importlogging
fromdatetimeimportdatetime
# 设置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')
logger=logging.getLogger(__name__)
classDistributedCrawler:
def__init__(self,proxy_list,num_processes=4,requests_per_ip=100,delay_range=(1,3)):
"""
初始化分布式爬虫
Args:
proxy_list: 代理服务器列表,格式为 [{"http": "http://ip:port", "https": "https://ip:port"}, ...]
num_processes: 进程数量
requests_per_ip: 每个IP发出的请求数量限制
delay_range: 请求延迟范围(秒)
"""
self.proxy_list=proxy_list
self.num_processes=num_processes
self.requests_per_ip=requests_per_ip
self.delay_range=delay_range
self.task_queue=multiprocessing.Queue()
self.result_queue=multiprocessing.Queue()
self.proxy_usage=multiprocessing.Manager().dict()
# 初始化代理使用计数
forproxyinproxy_list:
key=self._get_proxy_key(proxy)
self.proxy_usage[key]=0
def_get_proxy_key(self,proxy):
"""生成代理的唯一标识"""
http_proxy=proxy.get('http','')
https_proxy=proxy.get('https','')
returnf"{http_proxy}_{https_proxy}"
defadd_tasks(self,urls):
"""添加任务到队列"""
forurlinurls:
self.task_queue.put(url)
# 添加结束信号
for_inrange(self.num_processes):
self.task_queue.put(None)
defget_available_proxy(self):
"""获取可用的代理,基于使用次数进行负载均衡"""
# 按使用次数排序,选择使用最少的代理
sorted_proxies=sorted(
self.proxy_list,
key=lambdap:self.proxy_usage[self._get_proxy_key(p)]
)
# 返回使用次数未超限的代理
forproxyinsorted_proxies:
key=self._get_proxy_key(proxy)
ifself.proxy_usage[key]<self.requests_per_ip:
self.proxy_usage[key]+=1
returnproxy
# 如果所有代理都超限,随机选择一个并重置计数
proxy=random.choice(self.proxy_list)
key=self._get_proxy_key(proxy)
self.proxy_usage[key]=1
returnproxy
defworker(self,worker_id):
"""工作进程函数"""
logger.info(f"Worker {worker_id} started")
whileTrue:
url=self.task_queue.get()
ifurlisNone:# 结束信号
break
try:
# 获取代理
proxy=self.get_available_proxy()
logger.info(f"Worker {worker_id} using proxy: {proxy}")
# 添加随机延迟
delay=random.uniform(*self.delay_range)
time.sleep(delay)
# 发送请求
headers={
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response=requests.get(
url,
proxies=proxy,
headers=headers,
timeout=30
)
# 处理响应
ifresponse.status_code==200:
result={
'url':url,
'content':response.text[:1000],# 只取前1000个字符
'status':'success',
'worker_id':worker_id,
'proxy':proxy
}
self.result_queue.put(result)
logger.info(f"Worker {worker_id} successfully crawled {url}")
else:
self.result_queue.put({
'url':url,
'status':f'error: status code {response.status_code}',
'worker_id':worker_id,
'proxy':proxy
})
logger.warning(f"Worker {worker_id} got status code {response.status_code} for {url}")
exceptExceptionase:
self.result_queue.put({
'url':url,
'status':f'error: {str(e)}',
'worker_id':worker_id,
'proxy':proxy
})
logger.error(f"Worker {worker_id} error crawling {url}: {str(e)}")
logger.info(f"Worker {worker_id} finished")
defrun(self):
"""启动爬虫"""
processes=[]
# 启动工作进程
foriinrange(self.num_processes):
p=multiprocessing.Process(target=self.worker,args=(i,))
p.start()
processes.append(p)
# 等待所有进程完成
forpinprocesses:
p.join()
# 收集结果
results=[]
whilenotself.result_queue.empty():
results.append(self.result_queue.get())
returnresults
# 示例使用
if__name__=="__main__":
# 代理列表示例 (实际使用时需要替换为真实的代理IP)
proxy_list=[
{"http":"http://proxy1.example.com:8080","https":"https://proxy1.example.com:8080"},
{"http":"http://proxy2.example.com:8080","https":"https://proxy2.example.com:8080"},
{"http":"http://proxy3.example.com:8080","https":"https://proxy3.example.com:8080"},
{"http":"http://proxy4.example.com:8080","https":"https://proxy4.example.com:8080"},
]
# 要爬取的URL列表
urls_to_crawl=[
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
"https://httpbin.org/get",
]*5# 重复几次以展示分布式效果
# 创建分布式爬虫实例
crawler=DistributedCrawler(
proxy_list=proxy_list,
num_processes=4,# 使用4个进程
requests_per_ip=3,# 每个代理IP最多使用3次
delay_range=(1,2)# 请求间隔1-2秒
)
# 添加任务
crawler.add_tasks(urls_to_crawl)
# 运行爬虫并获取结果
results=crawler.run()
# 打印结果摘要
success_count=sum(1forrinresultsifr['status']=='success')
print(f"爬取完成! 成功: {success_count}, 失败: {len(results)-success_count}")
# 打印部分结果
fori,resultinenumerate(results[:3]):
print(f"结果 {i+1}:")
print(f" URL: {result['url']}")
print(f" 状态: {result['status']}")
print(f" 工作进程: {result.get('worker_id','N/A')}")
print(f" 使用的代理: {result.get('proxy','N/A')}")
if'content'inresult:
print(f" 内容预览: {result['content'][:100]}...")
print()
架构说明
这个分布式爬虫架构包含以下关键组件:
1、多进程工作模式:使用Python的multiprocessing模块创建多个工作进程,每个进程独立处理爬取任务。
2、代理隧道管理:
维护一个代理IP池
实现负载均衡算法,优先选择使用次数较少的代理
限制单个代理IP的请求频率,避免过度使用
3、任务队列:使用多进程安全的Queue来分配任务给各个工作进程。
4、随机化请求模式:
在每个请求之间添加随机延迟
使用不同的User-Agent头部
通过多个出口IP分散请求
扩展建议
对于更复杂的分布式爬虫系统,可以考虑以下扩展:
分布式任务队列:使用Redis或RabbitMQ替代multiprocessing.Queue,实现真正的跨机器分布式部署。
代理IP池服务:搭建独立的代理IP池服务,动态管理代理IP的可用性、速度和地域分布。
速率限制:实现更精细的请求速率控制,根据目标网站的robots.txt和实际响应调整请求频率。
故障恢复:添加重试机制和故障转移功能,提高系统的鲁棒性。
监控和日志:集成更完善的监控系统,实时跟踪爬虫状态、性能指标和错误率。
注意:在实际使用时,请确保遵守目标网站的robots.txt协议和相关法律法规,尊重网站的爬虫政策。
总之,分布式爬虫靠分工协作和IP轮换,把单个压力化解于无形。这样不仅效率翻倍,被封的风险也大大降低,让数据获取更加稳健顺畅。用好这个架构,爬虫就能像真正的团队作业一样,既高效又隐蔽。