告别IP被封!分布式爬虫的“隐身”与“分身”术

咱们平时上网爬数据,最头疼的就是IP被封。单台机器猛刷,网站一眼就能识破。想把活儿干得又快又稳,就得把任务拆开,让多台机器或多个进程一起干,每个还用不同的IP出口——这就好比让一群人轮流换装去排队,既减轻压力又降低风险。

下面是一个基于Python的分布式爬虫实现,使用多进程和代理隧道技术来分散请求压力并降低IP被封风险。

importmultiprocessing

importrequests

fromurllib.parseimporturlparse

importtime

importrandom

fromqueueimportQueue

importlogging

fromdatetimeimportdatetime

# 设置日志

logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')

logger=logging.getLogger(__name__)

classDistributedCrawler:

def__init__(self,proxy_list,num_processes=4,requests_per_ip=100,delay_range=(1,3)):

"""

       初始化分布式爬虫


       Args:

           proxy_list: 代理服务器列表,格式为 [{"http": "http://ip:port", "https": "https://ip:port"}, ...]

           num_processes: 进程数量

           requests_per_ip: 每个IP发出的请求数量限制

           delay_range: 请求延迟范围(秒)

       """

self.proxy_list=proxy_list

self.num_processes=num_processes

self.requests_per_ip=requests_per_ip

self.delay_range=delay_range

self.task_queue=multiprocessing.Queue()

self.result_queue=multiprocessing.Queue()

self.proxy_usage=multiprocessing.Manager().dict()


# 初始化代理使用计数

forproxyinproxy_list:

key=self._get_proxy_key(proxy)

self.proxy_usage[key]=0


def_get_proxy_key(self,proxy):

"""生成代理的唯一标识"""

http_proxy=proxy.get('http','')

https_proxy=proxy.get('https','')

returnf"{http_proxy}_{https_proxy}"


defadd_tasks(self,urls):

"""添加任务到队列"""

forurlinurls:

self.task_queue.put(url)


# 添加结束信号

for_inrange(self.num_processes):

self.task_queue.put(None)


defget_available_proxy(self):

"""获取可用的代理,基于使用次数进行负载均衡"""

# 按使用次数排序,选择使用最少的代理

sorted_proxies=sorted(

self.proxy_list,

key=lambdap:self.proxy_usage[self._get_proxy_key(p)]

       )


# 返回使用次数未超限的代理

forproxyinsorted_proxies:

key=self._get_proxy_key(proxy)

ifself.proxy_usage[key]<self.requests_per_ip:

self.proxy_usage[key]+=1

returnproxy


# 如果所有代理都超限,随机选择一个并重置计数

proxy=random.choice(self.proxy_list)

key=self._get_proxy_key(proxy)

self.proxy_usage[key]=1

returnproxy


defworker(self,worker_id):

"""工作进程函数"""

logger.info(f"Worker {worker_id} started")


whileTrue:

url=self.task_queue.get()

ifurlisNone:# 结束信号

break


try:

# 获取代理

proxy=self.get_available_proxy()

logger.info(f"Worker {worker_id} using proxy: {proxy}")


# 添加随机延迟

delay=random.uniform(*self.delay_range)

time.sleep(delay)


# 发送请求

headers={

'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'

               }


response=requests.get(

url,

proxies=proxy,

headers=headers,

timeout=30

               )


# 处理响应

ifresponse.status_code==200:

result={

'url':url,

'content':response.text[:1000],# 只取前1000个字符

'status':'success',

'worker_id':worker_id,

'proxy':proxy

                   }

self.result_queue.put(result)

logger.info(f"Worker {worker_id} successfully crawled {url}")

else:

self.result_queue.put({

'url':url,

'status':f'error: status code {response.status_code}',

'worker_id':worker_id,

'proxy':proxy

                   })

logger.warning(f"Worker {worker_id} got status code {response.status_code} for {url}")


exceptExceptionase:

self.result_queue.put({

'url':url,

'status':f'error: {str(e)}',

'worker_id':worker_id,

'proxy':proxy

               })

logger.error(f"Worker {worker_id} error crawling {url}: {str(e)}")


logger.info(f"Worker {worker_id} finished")


defrun(self):

"""启动爬虫"""

processes=[]


# 启动工作进程

foriinrange(self.num_processes):

p=multiprocessing.Process(target=self.worker,args=(i,))

p.start()

processes.append(p)


# 等待所有进程完成

forpinprocesses:

p.join()


# 收集结果

results=[]

whilenotself.result_queue.empty():

results.append(self.result_queue.get())


returnresults

# 示例使用

if__name__=="__main__":

# 代理列表示例 (实际使用时需要替换为真实的代理IP)

proxy_list=[

{"http":"http://proxy1.example.com:8080","https":"https://proxy1.example.com:8080"},

{"http":"http://proxy2.example.com:8080","https":"https://proxy2.example.com:8080"},

{"http":"http://proxy3.example.com:8080","https":"https://proxy3.example.com:8080"},

{"http":"http://proxy4.example.com:8080","https":"https://proxy4.example.com:8080"},

   ]


# 要爬取的URL列表

urls_to_crawl=[

"https://httpbin.org/ip",

"https://httpbin.org/user-agent",

"https://httpbin.org/headers",

"https://httpbin.org/get",

]*5# 重复几次以展示分布式效果


# 创建分布式爬虫实例

crawler=DistributedCrawler(

proxy_list=proxy_list,

num_processes=4,# 使用4个进程

requests_per_ip=3,# 每个代理IP最多使用3次

delay_range=(1,2)# 请求间隔1-2秒

   )


# 添加任务

crawler.add_tasks(urls_to_crawl)


# 运行爬虫并获取结果

results=crawler.run()


# 打印结果摘要

success_count=sum(1forrinresultsifr['status']=='success')

print(f"爬取完成! 成功: {success_count}, 失败: {len(results)-success_count}")


# 打印部分结果

fori,resultinenumerate(results[:3]):

print(f"结果 {i+1}:")

print(f"  URL: {result['url']}")

print(f"  状态: {result['status']}")

print(f"  工作进程: {result.get('worker_id','N/A')}")

print(f"  使用的代理: {result.get('proxy','N/A')}")

if'content'inresult:

print(f"  内容预览: {result['content'][:100]}...")

print()

架构说明

这个分布式爬虫架构包含以下关键组件:

1、多进程工作模式:使用Python的multiprocessing模块创建多个工作进程,每个进程独立处理爬取任务。

2、代理隧道管理

维护一个代理IP池

实现负载均衡算法,优先选择使用次数较少的代理

限制单个代理IP的请求频率,避免过度使用

3、任务队列:使用多进程安全的Queue来分配任务给各个工作进程。

4、随机化请求模式

在每个请求之间添加随机延迟

使用不同的User-Agent头部

通过多个出口IP分散请求

扩展建议

对于更复杂的分布式爬虫系统,可以考虑以下扩展:

分布式任务队列:使用Redis或RabbitMQ替代multiprocessing.Queue,实现真正的跨机器分布式部署。

代理IP池服务:搭建独立的代理IP池服务,动态管理代理IP的可用性、速度和地域分布。

速率限制:实现更精细的请求速率控制,根据目标网站的robots.txt和实际响应调整请求频率。

故障恢复:添加重试机制和故障转移功能,提高系统的鲁棒性。

监控和日志:集成更完善的监控系统,实时跟踪爬虫状态、性能指标和错误率。

注意:在实际使用时,请确保遵守目标网站的robots.txt协议和相关法律法规,尊重网站的爬虫政策。

总之,分布式爬虫靠分工协作和IP轮换,把单个压力化解于无形。这样不仅效率翻倍,被封的风险也大大降低,让数据获取更加稳健顺畅。用好这个架构,爬虫就能像真正的团队作业一样,既高效又隐蔽。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容