前言
1.适合有一定程序思维的速成者。
2.适合有其他语言基础的速成者。
爬虫流程
1.明确需求
就是你要从哪个网址拿到什么信息,本次我们需要yelp 的二级网址拿到商户的商户名,电话,地址,访问链接。
2.分析网址
查找你所需要的内容的相关熟悉。
3.开发调试
vscode 或者 pycharm
3.输出文档
输出csv文档
开始搞活
1.先看下需求
需要在yelp网址上查找固定的几个美国州 ['FL','GA','IL','IN','MD','MA','MI', 'NJ','NY','NC','OH','PA','SC','TN','TX','VA']内 "All You Can Eat" l类型的店铺信息.
如下图所示,具体链接为https://www.yelp.com/search?find_desc=All+You+Can+Eat&find_loc=FL
可以看到总21页,每页10条,除了正常要获取的10条商户信息外还有,数量不固定的广告位。为了好爬取信息,我这边直接忽略广告位的信息。直接爬取每页最多10条的商户信息。
2.点击第二页查看下不同页数的 访问规律:
https://www.yelp.com/search?find_desc=All+You+Can+Eat&find_loc=FL&start=10
每页start+10
3.接着查看 我们需要的数据在哪个位置
打开浏览器的F12 进入调试模式,建议google浏览器
通过初步分析发现我们要的数据在这边,看过去class name都是固定的,也有我们要的列表点击进去的二级url。
或者通过这种方式查找
通过观察发现数据在<Script>标签下面
下面这个是列表的名称和地址
本文通过的是爬取Script标签数据,来获取二级网址。
4.获取到二级网址后,我们继续分析二级网址内容
如下图所示,电话,地址都有了。
通过f12 看看这些东西在哪里
看上图就可以直接看出name,telephone,address
5.东西都查找完了,这个时候,如何提取,如何编写代码呢,对于不熟悉python的人来说,chatGpt就可以派上用场了,哪里报错了,就问chatGpt,边问边搞。
比如:
如何获取scrpt 标签中的内容
如何爬取script标签中的指定内容(正则)
只要你懂的问,就有结果。
接下来我们用chatGpt来边问边写代码.
先看看代码主体流程
# 程序结构
class xxxSpider(object):
def __init__(self):
# 定义常用变量,比如url或计数变量等
def get_html(self):
# 获取响应内容函数,使用随机User-Agent
def parse_html(self):
# 使用正则表达式来解析页面,提取数据
def write_html(self):
# 将提取的数据按要求保存,csv、MySQL数据库等
def run(self):
# 主函数,用来控制整体逻辑
if __name__ == '__main__':
# 程序开始运行时间
spider = xxxSpider()
spider.run()
基本上的爬虫流程就是这样的,定义url,获取url的内容,解析url,把获取到的东西写到文件中存储。
具体的代码如下:
csv 文件工具 csvUtil.py
import csv
def writeCsv(name,urls):
with open(name+'.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
#构建字段名称,也就是key
# 写入表头
writer.writerow(['url'])
# 写入每个 URL
for url in urls:
writer.writerow([url])
def initShopCsv(name):
with open(name+'.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
#构建字段名称,也就是key
# 写入表头
writer.writerow(['ShopName',"Phone","Address","Url"])
def writeShopCsv(data,name):
with open(name+'.csv', 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# 写入每个
writer.writerow(data)
print("写入数据:"+str(data))
常量类address_info.py
states=['FL','GA','IL','IN','MD','MA','MI',
'NJ','NY','NC','OH','PA','SC','TN','TX','VA']
主体代码 yelp.py
from urllib import parse
import time
import random
from fake_useragent import UserAgent
import requests
from lxml import etree
from bs4 import BeautifulSoup
import re
import json
import csvUtil
from static.address_info import states
#忽略ssl报错
requests.packages.urllib3.disable_warnings()
import concurrent.futures
class openTableSpider(object):
#1.初始化,设置基地址
def __init__(self):
self.url='https://www.yelp.com/search?{}'
#2.搜索店面,加入头部信息,查询时间理应大于当前时间,否则容易被发现不是人工点击
#查找到收寻的店明的列表信息
def searchShopHtml(self,url,state,size):
#参数信息
params={
'find_desc':"All You Can Eat",
'find_loc':state,
'start':size
}
#格式化信息
full_url=url.format(parse.urlencode(params))
print(""+full_url)
ua=UserAgent()
# req=requests.Request(url=full_url,)
respone=requests.get(url=full_url,headers={'User-Agent':'AdsBot-Google'},verify=False)
# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(respone.text, 'html.parser')
# 查找所有的 <script> 标签
script_tags = soup.find_all('script')
# 输出每个 <script> 标签的内容
for script in script_tags:
script_str=script.string
# 关键字 businessUrl /biz 开头
if script_str and 'businessUrl' in script_str:
# print("开始:"+script_str)
#查找到相应的访问url
link_match=re.findall(r'"businessUrl":"([^"]+)"',script_str)
if link_match:
format_urls=[x for x in link_match if "ad_business_id" not in x]
# 去重
urls = list(set(format_urls))
# print(len(urls))
print(str(urls))
# csvUtil.writeCsv(shopName,link_match)
return urls
# 3.解析查找到的每家店的url,并且获取到店面和电话号码,以及地址。
def parse_Url(self,url,state):
url="https://www.yelp.com/"+url
print("Start doing "+url)
ua=UserAgent()
respone=requests.get(url=url,headers={'User-Agent':ua.random},verify=False)
# 使用 BeautifulSoup 解析 HTML
soup = BeautifulSoup(respone.text, 'html.parser')
# 查找所有的 <script> 标签
script_tags = soup.find_all('script')
# 输出每个 <script> 标签的内容
# 找到name,phone,address 数据的位置,发现在 script标签底下
# name window.__INITIAL_STATE__ restaurantProfile restaurant name address
# phone= contactInformation formattedPhoneNumber
# address address":{Phone number
for script in script_tags:
script_str=script.string
address=name=phone=""
if script_str and 'telephone' in script_str:
# print(script_str)
m_address = re.search( r'"address":{(.*?)},', script_str)
if m_address:
# {"line1":"5115 Wilshire Blvd","line2":"","state":"CA","city":"Los Angeles","postCode":"90036","country":"United States","__typename":"Address"},
temp=m_address.group(1)
# print(temp)
try:
streetAddress = re.search(r'"streetAddress":"([^"]*)"', temp).group(1)
addressLocality = re.search( r'"addressLocality":"([^"]*)"', temp).group(1)
addressCountry = re.search( r'"addressCountry":"([^"]*)"', temp).group(1)
addressRegion = re.search( r'"addressRegion":"([^"]*)"', temp).group(1)
postalCode = re.search(r'"postalCode":"([^"]*)"', temp).group(1)
address=streetAddress.replace("\\n","")+","+addressLocality+","+addressRegion+","+postalCode
print(address)
# flag=any(item.upper() in state.upper() for item in openTableAddress)
# # 如果不在地址池中的店,则不记录下来
# if flag is False:
# return
except Exception as e:
address=streetAddress+","+addressLocality+","+addressRegion+","+postalCode
print("地址解析异常:", e)
m_name = re.search( r'"name":"(.*?)",', script_str)
if m_name:
name=m_name.group(1)
# print(name)
m_phone = re.search( r'telephone":"(.*?)",', script_str)
if m_phone:
phone=m_phone.group(1)
# print(phone)
data=[name,phone,address,url]
csvUtil.writeShopCsv(data=data,name=state)
# 4.一个一个流程
def doTransaction(self,state,page):
urls=self.searchShopHtml(self.url,state,page)
if not urls:
return
print(len(urls))
while len(urls)%10==0:
print(len(urls))
new_urls=[]
reptry=0
page+=10
reptry=0
#重试10次
while(reptry<10):
try:
new_urls = self.searchShopHtml(spider.url,state, page)
break
except Exception as e:
print("发生了其他异常:", e)
# 重试一次
reptry+=1
time.sleep(random.randint(5,10))
# 如果没有新的URLs,即查询结果为空,终止循环
if not new_urls:
break
urls += new_urls
time.sleep(random.randint(5,10))
count=0
#重试10次
for url in urls:
reptry=0
while(reptry<10):
try:
self.parse_Url(url,state)
count+=1
print(state+" deal count "+str(count))
break
except Exception as e:
print("发生了其他异常:", e)
reptry+=1
finally:
time.sleep(random.randint(3,10))
#5.入口函数
def run(self):
for state in states:
csvUtil.initShopCsv(state)
# 创建线程池并并发执行任务
with concurrent.futures.ThreadPoolExecutor() as executor:
# 使用executor.map并发执行任务,传递参数列表
executor.map(self.doTransaction, states, [0] * len(states))
print("All tasks have been submitted.")
# # 单线程处理
# threads = []
# for state in states:
# print(state)
# csvUtil.initShopCsv(state)
# thread = threading.Thread(target=)
# thread.start()
# threads.append(thread)
# # 等待所有线程完成
# for thread in threads:
# thread.join()
if __name__=='__main__':
start=time.time()
spider=openTableSpider() #实例化一个对象spider
# spider.searchShopHtml(spider.url,"New York",0) #调用入口函数
# spider.parse_Url("biz/zen-korean-bbq-duluth?osq=All+You+Can+Eat+Buffet","GEORGIA")
spider.run()
end=time.time()
#查看程序执行时间
print('执行时间:%.2f'%(end-start))
具体内容查看注释,有不懂操作的直接问gpt