经验总结:
一、亚马逊对于国外的IP友好,对于国内IP不友好
小规模使用香港VPN或者美国IP直接通过 request请求即可。robot几率大概是15%
如果使用国内IP robot几率就有90%
验证码识别解决:http://www.886it.cn/
二、如果你是大规模的用户这样,需要考虑以下几个方面:
高性能爬虫设计:
使用高效的异步网络库,如aiohttp或利用分布式爬虫系统,如feapder,并部署多个实例以提高并发能力。
IP解锁与代理管理:
- 国内代理池的构建和管理,可以自建或使用第三方服务。
- 实现自动切换IP的中间件,一旦检测到IP被封,自动更换代理。
- 定期验证代理池中IP的有效性,确保代理IP的质量。
数据解析与存储:
- 使用feapder框架进行数据抓取和解析,利用其强大的Xpath和CSS选择器。
- 数据存储可以考虑使用SQLite,MySQL、PostgreSQL或MongoDB。
时间与成本控制:
- 优化爬虫的抓取策略,比如合理安排爬取时间、避免重复爬取等。
- 成本控制方面,可以通过提高代理IP的复用率、减少无效请求、优化数据处理流程等方式来降低成本。
代码优化:
- 对爬虫代码进行性能分析和优化,减少不必要的计算和网络请求。
- 使用缓存策略,减少对同一URL的重复访问。
解决方案:
- 采用自建验证码识别系统,减少识别费用产生成本
- 采用chrome 动态采集+request 采集同时进行。chrome +webdriver 用于识别和解锁ip
- 这样就能实现数据的高效采集,一边解锁IP,一边上锁IP
- 这样IP被锁的概率由原先的80%可以降低到40%
- 解析页面数据
- 采集完后需要写入数据库中,这里有两种方式写入一种是同步写入,会有并发性能问题,二异步写入,性能提高700%
以下是异步写入数据库的方法
将采集好的数据,先保存为本地json,然后通过其他的进程异步上传,本进程只做采集。进过测试,改造成异步后性能提升700%
代码部分:
识别和解锁ip
is_robot = "/errors/validateCaptcha" in response.text
if (is_robot):
# 机器人验证弹出 则标记被锁
print(response.url + "mmmm" + proxy_about + '+++++++++' + "#机器人验证弹出================================")
#读取验证码图片地址
'div.a-box-inner div.a-text-center img'
#执行IP解锁
solution = get_amazon_code(code_img)
def get_amazon_code(captcha_url="https://images-na.ssl-images-amazon.com/captcha/ahkfsmoa/Captcha_lvghpcxkgh.jpg", captcha_type="amazon", key='B4EF4EV1O1',):
# max req 20/qps
payload = {"captcha_url": captcha_url, "key": key, "captcha_type": captcha_type, }
headers = {
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Origin": "http://www.886it.cn",
"Pragma": "no-cache",
"Referer": "http://www.886it.cn/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"X-Requested-With": "XMLHttpRequest"
}
files = []
response = requests.request("POST", "http://www.886it.cn/api/captcha/code", headers=headers, data=payload, files=files)
return response.json()['code']
【关键词列表页采集】—【排位解析】
asin_info = {}
asin_info[“title”] = title #商品标记
asin_info[“asin”] = dataAsin #商品asin
asin_info[“price”] = price #价格
asin_info[“ratting”] = ratting #打分
asin_info[“ratting_num”] = parseNumber(ratting_num) #打分数
asin_info[“goods_type”] = goods_type #广告还是自然排名
asin_info[“img_url”] = pic #首图
asin_info[“coupon”] = coupon #优惠券
asin_info[“deal”] = deal #促销
asin_info[“prime”] = prime #会员折扣
asin_info[“promotion”] = promotion
asin_info[“bs”] = bs
asin_info[“ac”] = ac
asin_info[“sales_num”] = sale_num #销量2024年更新
asin_info[“uuid”] = uuid #页面唯一标识
def parseAdsLocation(self, adsItem, Respons, country_code, asins, pageNo):
adsRankCount = 0 # 广告总排名
natureRankCount = 0 # 自然总排名
BestadsElement = False
BestnatureElement = False
# 判断是否存在数据
searchResultElements = Respons.xpath('//span[@data-component-type="s-search-results"]')
if (not searchResultElements):
return [], [], 0
# 获取所有商品信息
asinElements = searchResultElements[0].xpath(
'//div[@class="s-main-slot s-result-list s-search-results sg-row"]/div[@data-uuid!=""]')
if (len(asinElements) == 0):
print('没有广告数据====================================')
return [], [], 0
asd_asin_arr = []
for asd_asin in asins.split(","):
if (len(asd_asin) > 2): asd_asin_arr.append(asd_asin)
Elements = []
key_asins = []
otherPlace = []
for asinElement in asinElements:
Element = adsItem.copy()
# 品牌广告
asinElement_text = asinElement.extract()
if ("s-result-item s-widget s-widget-spacing-large AdHolder s-flex-full-width" in asinElement_text):
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('品牌广告')
break
continue
# 视频广告
if ("sbv-video-single-product" in asinElement_text):
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('视频广告')
break
continue
if ("a-size-medium-plus a-color-base" in asinElement_text and "Amazon’s Choice" in asinElement_text):
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('AC推荐')
break
continue
if ("MAIN-SHOPPING_ADVISER" in asinElement_text and "HIGHLY RECOMMENDED" in asinElement_text):
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('ER推荐')
break
continue
if ("MAIN-SHOPPING_ADVISER" in asinElement_text and "Climate Pledge Friendly" in asinElement_text):
print("CLIMATE PLEDGE FRIENDLY")
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('气候友好保证')
break
continue
if (
"a-size-medium-plus a-color-base" in asinElement_text and "Top rated from our brands" in asinElement_text):
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('TR推荐')
break
continue
# 高度推荐 HIGHLY RATED
if ("a-size-medium-plus a-color-base" in asinElement_text and "Highly rated" in asinElement_text):
print("hahahhahahaha**************", asd_asin_arr)
for asd_asin in asd_asin_arr:
if (asd_asin in asinElement_text):
otherPlace.append('HR推荐')
break
continue
# print( Element['ads_rank']," Element['ads_rank']====================")
sponsoredElements1 = asinElement.css('span[class="s-label-popover-default"]')
sponsoredElements2 = asinElement.css('span[class="puis-label-popover-default"]')
sponsoredElementsmx = asinElement.css('span[class="puis-label-popover-hover"]')
sponsoredElements = False
if (len(sponsoredElements1) > 0 or len(sponsoredElements2) > 0 or len(sponsoredElementsmx) > 0):
sponsoredElements = True
dataAsin = asinElement.css('div::attr(data-asin)').extract_first()
if (len(str(dataAsin)) < 5): continue
# 含有Amazon's Choice的商品
# B0748W31L5-amazons-choice
amazonsChoice = asinElement.xpath('//*[@id="' + dataAsin + '-amazons-choice"' + ']')
# 含有best-seller的商品
bestSeller = asinElement.xpath('//*[@id="' + dataAsin + '-best-seller"' + ']')
if (amazonsChoice):
ac = 1
else:
ac = 0
if (bestSeller):
bs = 1
else:
bs = 0
# 当前产品的信息
if (sponsoredElements):
goods_type = "A"
else:
goods_type = "N"
ratting = self.parseRatting(asinElement, country_code)
ratting_num = asinElement.css('span[class="a-size-base s-underline-text"]::text').extract_first()
price = self.parsePrice(asinElement, country_code)
pic = asinElement.css('img[class="s-image"]::attr(src)').extract_first()
title = self.parseTitle(asinElement, country_code)
uuid = asinElement.css('div::attr(data-uuid)').extract_first()
sale_num = asinElement.css(
'div[class="a-row a-size-base"] span[class="a-size-base a-color-secondary"]::text').extract_first()
coupon = asinElement.css(
'span[class="a-size-base s-highlighted-text-padding aok-inline-block s-coupon-highlight-color"]::text').extract_first()
if (coupon == None):
coupon = ""
if ("save" not in coupon.lower()):
coupon = ""
deal = ""
deal = asinElement.css('span[class="a-badge-label-inner a-text-ellipsis"] span::text').extract_first()
if (deal == None):
deal = ""
if ("deal" not in deal.lower()):
deal = ""
promotion = ""
prime = ""
secondary = asinElement.css('div[class="a-row a-size-base a-color-secondary"] span::text').extract_first()
if (secondary == None):
secondary = ""
if ("prime" in secondary.lower()):
prime = secondary.replace("join ", "").replace("to buy this item ", "")
if ("promotion" in secondary.lower()):
promotion = secondary
# print(Respons.url)
# print("ratting_num:",ratting_num)
asin_info = {}
asin_info["title"] = title #商品标记
asin_info["asin"] = dataAsin #商品asin
asin_info["price"] = price #价格
asin_info["ratting"] = ratting #打分
asin_info["ratting_num"] = parseNumber(ratting_num) #打分数
asin_info["goods_type"] = goods_type #广告还是自然排名
asin_info["img_url"] = pic #首图
asin_info["coupon"] = coupon #优惠券
asin_info["deal"] = deal #促销
asin_info["prime"] = prime #会员折扣
asin_info["promotion"] = promotion
asin_info["bs"] = bs
asin_info["ac"] = ac
asin_info["sales_num"] = sale_num #销量2024年更新
asin_info["uuid"] = uuid #页面唯一标识
if (sponsoredElements):
# 广告排名
adsRankCount = adsRankCount + 1
# 当前页广告数
if (dataAsin in asins):
# print(adsRankCount,"广告排名 in ****************")
# 记录广告排名与页码以及页码排名
Element['goods_type'] = "A"
Element['ads_page_rank'] = adsRankCount
Element['ads_page_no'] = pageNo
Element['ads_asin_info'] = json.dumps(asin_info)
if (BestadsElement == False):
BestadsElement = Element
else:
# 自然排名
natureRankCount = natureRankCount + 1
# 当前页自然排名
if (dataAsin in asins):
# print(adsRankCount, "自然排名 in****************")
Element['goods_type'] = "N"
Element['nature_page_rank'] = natureRankCount
Element['nature_page_no'] = pageNo
Element['nature_asin_info'] = json.dumps(asin_info)
if (BestnatureElement == False):
BestnatureElement = Element
key_asins.append(asin_info)
all_count=0
if (BestnatureElement != False):
Elements.append(BestnatureElement)
if (BestadsElement != False):
Elements.append(BestadsElement)
# 检查是否底部有广告
if (len(Elements) == 0):
Elements.append(adsItem)
Elements_new = []
for Element in Elements:
Element['search_num'] = all_count
Element['ads_page_num'] = adsRankCount
Element['nature_page_num'] = natureRankCount
Element["other_place"] = otherPlace
Elements_new.append(Element)
return Elements_new, key_asins, adsRankCount
入库效果
这里是亚马逊amazon 解析页面评论的python demo
def parseReview_element(self, element, countryCode):
# review唯一标识
reviewId = element.css('div::attr(id)').extract_first()
if (reviewId == None):
# 不是本国的review,跳过
return None
element_head = element.css("#customer_review-" + reviewId).extract_first()
if (element_head == None):
# 不是本国的review,跳过
return None
# 亚马逊用户id与名称
amazonUserId = self.user(element)
amazonUserName = str(element.css(".a-profile-name::text").extract_first())[0: 90]
# 星级与title
star = self.star(element)
title = self.title(element)
# review date
reviewDateElement = element.css('span[data-hook*="review-date"]').extract_first()
reviewDate = ReviewDateParseHelper().reviewDate(html_clear(reviewDateElement))
if (reviewDate == ReviewDateParseHelper().LocalDateTimeUtils.parse("2099-12-31",
ReviewDateParseHelper().LocalDateTimeUtils.YYYY_MM_DD)):
logging.debug("解析评论日期出错*********************************")
logging.debug(element.extract())
logging.debug("解析评论日期出错*********************************")
# Verified_Purchase 购买认证
verifiedElement = element.css('span[data-hook*="avp-badge"]').extract_first()
verified = 1 if verifiedElement else 0
# 点赞数
helpful_num = element.css('span[data-hook*="helpful-vote-statement"]::text').extract_first()
if (helpful_num):
helpful_num = "".join(list(filter(str.isdigit, helpful_num)))
helpful_num = int(helpful_num) if (helpful_num != "") else 1
else:
helpful_num = 0
# *review 内容
content = html_clear(element.css('span[data-hook*="review-body"]').extract_first())
originReviewDate = html_clear(reviewDateElement)
user_info_ext = ",".join(element.css('span[class*="c7yTopDownDashedStrike"]::text').extract())
# 解析media
videos = []
has_video = element.css("div[class*='cr-video-desktop']").extract_first()
if (has_video):
video = {}
video['videoUrl'] = element.css("input[value$='.mp4']::attr(value)").extract_first()
video['videoSlateImgUrl'] = element.css(
"div[class*='cr-video-desktop']::attr(data-thumbnail-url)").extract_first()
videos.append(video)
pics = []
has_pics = element.css("div[class*=review-image-tile-section] span a img::attr(src)").extract()
if (has_pics):
for pic in has_pics:
pics.append(pic)
if(len(pics)==0):
has_pics2 = element.css("div[class*=cr-lightbox-image-thumbnails] img::attr(src)").extract()
if (has_pics2):
for pic in has_pics2:
pics.append(pic)
if (len(videos) > 0 or len(pics) > 0):
media = {}
media['videos'] = videos
media['reviewImageUrls'] = list(set(pics))
media = json.dumps(media, ensure_ascii=False)
else:
media = ""
print("media:==============",media)
content = content.replace('The media could not be loaded.', " ").strip().replace("\n", " ").replace("\r", " ")
Review = {}
Review['country_code'] = countryCode
Review['review_id'] = reviewId
Review['amazon_user_id'] = amazonUserId
Review['amazon_user_name'] = amazonUserName
Review['verified'] = verified
Review['star'] = star
Review['title'] = title
Review['content'] = content
Review['helpful_num'] = helpful_num # helpful_num
Review['review_date'] = reviewDate
Review['user_info_ext'] = user_info_ext
Review['origin_review_date'] = originReviewDate
Review['media'] = media
Review['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
Review['update_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))
return Review
亚马逊日期采集是一个非常繁琐的事情,涵盖全部站点,所有语言US FR DE IT ES SE AE 包括阿拉伯语等,特别是阿拉伯语,最让人吐血
如下是解析US UK CA MX站点日期的代码,大家可以参考然后取解析其他站点的数据。
日期解析只能一个站点一个站点的解析,没有其他技巧
def reviewDateUK(self, reviewDate):
try:
reviewDate = re.sub(".*on ", "",reviewDate)
reviewDate = re.sub(",", "",reviewDate)
dates = reviewDate.split(" ")
reviewDate = dates[2] + "-" + self.LocalDateTimeUtils.transferMonth(dates[1]) + "-" + dates[0]
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_M_D)
if (parse == None):
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_MM_DD)
return parse
except BaseException:
return self.LocalDateTimeUtils.parse("2099-12-31", self.LocalDateTimeUtils.YYYY_MM_DD)
"""
* US->on February 26, 2016
"""
def reviewDateUS(self, reviewDate):
try:
reviewDate = re.sub( ".*on ", "",reviewDate)
reviewDate = re.sub(",", "",reviewDate)
dates = reviewDate.split(" ")
reviewDate = dates[2] + "-" + self.LocalDateTimeUtils.transferMonth(dates[0]) + "-" + dates[1]
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_M_D)
if (parse == None):
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_MM_DD)
return parse
except BaseException:
return self.LocalDateTimeUtils.parse("2099-12-31", self.LocalDateTimeUtils.YYYY_MM_DD)
"""
* AU->on 26 February, 2016
"""
def reviewDateAU(self, reviewDate):
try:
reviewDate = re.sub(".*on ", "",reviewDate)
reviewDate = re.sub(",", "",reviewDate)
dates = reviewDate.split(" ")
reviewDate = dates[2] + "-" + self.LocalDateTimeUtils.transferMonth(dates[1]) + "-" + dates[0]
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_M_D)
if (parse == None):
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_MM_DD)
return parse
except BaseException:
return self.LocalDateTimeUtils.parse("2099-12-31", self.LocalDateTimeUtils.YYYY_MM_DD)
"""
* US->on February 26, 2016
* CA->on March 7, 2016
"""
def reviewDateCA(self, reviewDate):
try:
reviewDate = re.sub(".*on ", "",reviewDate)
reviewDate = re.sub(",", "",reviewDate)
dates = reviewDate.split(" ")
reviewDate = dates[2] + "-" + self.LocalDateTimeUtils.transferMonth(dates[0]) + "-" + dates[1]
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_M_D)
if (parse == None):
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_MM_DD)
return parse
except BaseException:
return self.LocalDateTimeUtils.parse("2099-12-31", self.LocalDateTimeUtils.YYYY_MM_DD)
"""
* US->on February 26, 2016
* MX->en 8 de noviembre de 2016
"""
def reviewDateMX(self, reviewDate):
try:
reviewDate = re.sub( ".*el ", "",reviewDate)
dates = reviewDate.split(" de ")
reviewDate = dates[2] + "-" + self.LocalDateTimeUtils.transferMonth(dates[1]) + "-" + dates[0]
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_M_D)
if (parse == None):
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_MM_DD)
return parse
except BaseException:
return self.LocalDateTimeUtils.parse("2099-12-31", self.LocalDateTimeUtils.YYYY_MM_DD)
"""
*
* @param reviewDate
"""
def reviewDateJP(self, reviewDate):
try:
# 2021-3-18に本でレビュー済み
reviewDate = reviewDate.replace("に本でレビュー済み", "")
reviewDate = reviewDate.replace("に日本でレビュー済み", "")
reviewDate = reviewDate.replace("评论于", "")
reviewDate = reviewDate.replace("在日本 🇯🇵 发布", "")
reviewDate = re.sub("年", "-",reviewDate)
reviewDate = re.sub("月", "-",reviewDate)
reviewDate = re.sub("日", "",reviewDate)
reviewDate = reviewDate.strip()
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_M_D)
if (parse == None):
parse = self.LocalDateTimeUtils.parse(reviewDate, self.LocalDateTimeUtils.YYYY_MM_DD)
return parse
except BaseException as e:
print("s",e,"s")
return self.LocalDateTimeUtils.parse("2099-12-31", self.LocalDateTimeUtils.YYYY_MM_DD)