bilibili 2019年拜年祭的《千里之外》很好看,于是我想用《python爬虫开发从入门到实战》第七章的爬虫技术抓取一下评论。打开页面观察源码和network发现是用ajax异步加载的,直接访问打不开,需要伪造headers,有些麻烦。(实际上伪造非常简单,但是从json串里提取结果很麻烦,远没有直接从网页的xpath提取简单,见ajax_get_comment
方法。其中CrawlerUtility
来自https://github.com/kingname/CrawlerUtility,感谢青南的小工具,解析headers方便多了。)
因此我决定用selenium抓取一下评论, 第一页的抓取只有进入该页面,然后定位到具体元素就可以爬取下来,但是抓取的时候,需要先等该元素加载好再去抓取,我将等待和抓取逻辑封装了一下,定义出一个函数方便使用,其中参数parent
可以是driver
,也可以是页面元素,find_method
是expected_conditions
的条件之一,如find_element_by_xpath
或find_elements_by_xpath
等:
def wait_until(self, parent, xpath, find_method):
driver = parent or self.driver
def find(driver):
element = attrgetter(find_method)(driver)(xpath)
if element:
return element
else:
return False
try:
element = WebDriverWait(driver, self.TIME_OUT).until(find)
return element
except TimeoutException as _:
raise TimeoutException('Too slow')
使用的时候可以这样用:
total_page = self.wait_until(None, "//div[@class='header-page paging-box']/span[@class='result']",
self.FIND_ELEMENT_BY_XPATH).text
点击下一页,发现页面没有刷新,可以知道肯定是用ajax异步读取数据并加载进来了,因此需要定位到“下一页”的按钮,然后进入下一页后再抓取,可以用wait...until
语法先等按钮加载完成,再点击:
def _goto_next_page(self):
driver = self.driver
next_page_path = '//div[@class=\'header-page paging-box\']/a[@class="next"]'
WebDriverWait(driver, self.TIME_OUT).until(EC.element_to_be_clickable((
By.XPATH,
next_page_path))
)
next_page = driver.find_element_by_xpath(next_page_path)
next_page.click()
循环抓取直到最后一页的逻辑可以写成这样:
while True:
current_page = self.get_single_page_comments()
if current_page == self.total_page:
break
self._goto_next_page()
在做抓取时,我发现经常会报错element is not attached to the page document
, 即使做了wait也不行,后来我发现,加一行滚动到页面底部可以减少报错,虽然不能彻底消除报错:
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
如果还是报错,似乎是因为翻页太快导致的,虽然我无法理解wait了为什么还是报错,但是我找到了一种解决方案:重新进入同一个页面再抓一次,进入某页的方法如下(后来我找出了原因,请看后面的补充说明):
def _goto_page(self, page):
driver = self.driver
path = "//div[@class='page-jump']/input"
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
WebDriverWait(driver, self.TIME_OUT).until(EC.presence_of_element_located((
By.XPATH,
path))
)
elem = driver.find_element_by_xpath(path)
elem.clear()
elem.send_keys(str(page))
elem.send_keys(Keys.RETURN)
然后重复进入之后休眠1s钟,让所有页面元素都能加载好:
try:
element = WebDriverWait(driver, self.TIME_OUT).until(find)
return element
except TimeoutException as _:
raise TimeoutException('Too slow')
times = 0
while times < self.RETRY_TIMES:
try:
self._receive_current_page(current_page)
break
except:
print(f'重新进入第{current_page}页')
self._goto_page(current_page)
from time import sleep
sleep(1)
times += 1
else:
print(f'page{current_page}未爬全')
【补充说明】后来我又网上google了一下,找到了解释
,报错是因为原先抓获取的子元素在遍历时不在页面上导致的(之所以遍历时不在页面上是因为一开始抓取的还是上一页的条数,要稍等一会才加载成本页的),因此不需要翻页,只要稍等片刻用retry
再重新抓取一下就可以解决该问题:
@retry(reraise=True, stop=stop_after_attempt(10), before_sleep=my_before_sleep)
def _receive_current_page(self, current_page):
print(f'page {current_page}')
elements = self.wait_until(None, '//div[starts-with(@class, "list-item reply-wrap")]',
self.FIND_ELEMENTS_BY_XPATH)
print(f'len({current_page})={len(elements)}')
for ele in elements:
try:
comment = self._get_one_comment(ele)
self.comments[f'page{current_page}'].append(comment)
except Exception as _:
raise
其中my_before_sleep
的作用是:在重试前稍等片刻,再做把该页的数据清空, 防止数据重复:
def my_before_sleep(retry_state):
"""重试时会进入该函数"""
self = retry_state.args[0] # args是my_before_sleep所装饰的函数的参数
current_page = retry_state.args[1]
self.comments.pop(f'page{current_page}', None)
print('Retrying %s: attempt %s ended with: %s' % (retry_state.fn, retry_state.attempt_number, retry_state.outcome))
这样一来,拉取流程就可以简化为:
def get_single_page_comments(self):
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
current_page = self._get_current_page()
self._get_total_page()
self._receive_current_page(current_page)
return current_page
通过以上几种方式,我终于成功得把111页数据都抓取了下来,完整代码如下,你会发现我对其中的几个函数用了retry
装饰器,通过重复增加成功率,抓取的数据我放在一个字典里self.comments
,字典的key是页数,字典的值是一个存储该页评论的列表,如果重新要抓取某一页,记得要把该页先pop掉(这个前面已经提到过)。
import json
import os
import re
import time
from collections import defaultdict
from operator import attrgetter
import requests
from CrawlerUtility import ChromeHeaders2Dict
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from tenacity import retry, stop_after_attempt
def my_before_sleep(retry_state):
"""重试时会进入该函数"""
self = retry_state.args[0] # args是my_before_sleep所装饰的方法的参数
current_page = retry_state.args[1]
self.comments.pop(f'page{current_page}', None)
print('Retrying %s: attempt %s ended with: %s' % (retry_state.fn, retry_state.attempt_number, retry_state.outcome))
class BilibiliSpider:
HOME_URL = 'https://www.bilibili.com/blackboard/bnj2019.html?spm_id_from=333.334.b_72616e6b696e675f646f756761.4&aid=36570507&p='
COMMENT_LIST_API = 'https://api.bilibili.com/x/v2/reply?callback=jQuery172037695699199400234_1549378739473&jsonp=jsonp&pn=1&type=1&oid=36570507&sort=0&_=1549378775254'
HEADERS = """
Accept: */*
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7
Connection: keep-alive
Cookie: _uuid=3C34470A-8E07-EB73-821A-8C9296CE917C39306infoc; buvid3=38E4616F-6B80-4E47-830B-8BAD5C4EB5BB6688infoc; stardustvideo=1; fts=1539694104; rpdid=lmsiqospkdosklxlpopw; im_notify_type_1483693=0; CURRENT_FNVAL=16; UM_distinctid=166ab725293e7-044e2ac2d71b1e-1f396652-13c680-166ab7252943f5; sid=7wuz88lx; LIVE_BUVID=0579974c1f7c3c2575fe88d4443faa29; LIVE_BUVID__ckMd5=c5e8ececd91bd208; DedeUserID=1483693; DedeUserID__ckMd5=0bdf06a909d01153; SESSDATA=a2280675%2C1550158353%2C2abfd611; bili_jct=a9180d3b91a2e87f2ad305187aa98ff2; finger=e4810d01; _ga=GA1.2.540612000.1548599946; _gid=GA1.2.1043200605.1548599946; gr_user_id=f0122614-4e80-490c-93b7-2856cbd7a8ac; grwng_uid=890e5ba7-1aa0-490b-a21c-aa9d0cefab49; CURRENT_QUALITY=32; bp_t_offset_1483693=216899400389846136; _dfcaptcha=4ca4802a95feba42a36d95ad00f25022
Host: api.bilibili.com
Referer: https://www.bilibili.com/blackboard/bnj2019.html?spm_id_from=333.334.b_72616e6b696e675f646f756761.4&p=&aid=36570536
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36
"""
FIND_ELEMENTS_BY_XPATH = 'find_elements_by_xpath'
FIND_ELEMENT_BY_XPATH = 'find_element_by_xpath'
TIME_OUT = 50
RETRY_TIMES = 10
driver = None
total_page = None
def __init__(self):
self.start = time.time()
self.comments = defaultdict(list)
def __enter__(self):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--headless")
self.driver = webdriver.Chrome(chrome_options=chrome_options, executable_path='./chromedriver')
# self.driver = webdriver.PhantomJS('./phantomjs')
return self
def __exit__(self, type, value, traceback):
print()
print(f'elapse time: {time.time() - self.start}')
print('total pages', len(self.comments))
print('total comments', sum(len(page) for page in self.comments.values()))
if self.driver:
self.driver.quit()
@property
def headers(self):
return ChromeHeaders2Dict(self.HEADERS)
def ajax_get_comment(self):
resp = requests.get(self.COMMENT_LIST_API, headers=self.headers).content.decode()
json_str = resp[resp.find('{"'):-1]
data = json.loads(json_str).get('data')
print(data)
def wait_until(self, parent, xpath, find_method):
driver = parent or self.driver
def find(driver):
element = attrgetter(find_method)(driver)(xpath)
if element:
return element
else:
return False
try:
element = WebDriverWait(driver, self.TIME_OUT).until(find)
return element
except TimeoutException as _:
raise TimeoutException('Too slow')
def driver_get_comments(self):
driver = self.driver
driver.get(self.HOME_URL)
while True:
current_page = self.get_single_page_comments()
if current_page == self.total_page:
break
self._goto_next_page()
self.save_to_file()
def save_to_file(self, filename='comments.md'):
if os.path.exists(filename):
os.remove(filename)
with open(filename, 'a') as f:
for page, comment_list in self.comments.items():
f.write(f'- {page}\n')
print(f'len(page{page})={len(comment_list)}')
for i, comment in enumerate(comment_list):
f.write(f' - {comment}\n')
@retry(reraise=True, stop=stop_after_attempt(RETRY_TIMES))
def _goto_next_page(self):
driver = self.driver
next_page_path = '//div[@class=\'header-page paging-box\']/a[@class="next"]'
WebDriverWait(driver, self.TIME_OUT).until(EC.element_to_be_clickable((
By.XPATH,
next_page_path))
)
next_page = driver.find_element_by_xpath(next_page_path)
next_page.click()
@retry(reraise=True, stop=stop_after_attempt(RETRY_TIMES))
def _goto_page(self, page):
driver = self.driver
path = "//div[@class='page-jump']/input"
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
WebDriverWait(driver, self.TIME_OUT).until(EC.presence_of_element_located((
By.XPATH,
path))
)
elem = driver.find_element_by_xpath(path)
elem.clear()
elem.send_keys(str(page))
elem.send_keys(Keys.RETURN)
@retry(reraise=True, stop=stop_after_attempt(RETRY_TIMES))
def _get_total_page(self):
if not self.total_page:
total_page = self.wait_until(None, "//div[@class='header-page paging-box']/span[@class='result']",
self.FIND_ELEMENT_BY_XPATH).text
total_page = re.search('共(\\d+)页', total_page).group(1)
total_page = int(total_page)
self.total_page = total_page
print(f'共{total_page}页')
@retry(reraise=True, stop=stop_after_attempt(RETRY_TIMES))
def _get_current_page(self):
current_page = self.wait_until(None, "//div[@class='header-page paging-box']/span[@class=\"current\"]",
self.FIND_ELEMENT_BY_XPATH).text
current_page = int(current_page)
return current_page
def get_single_page_comments(self):
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
current_page = self._get_current_page()
self._get_total_page()
self._receive_current_page(current_page)
return current_page
@retry(reraise=True, stop=stop_after_attempt(RETRY_TIMES), before_sleep=my_before_sleep)
def _receive_current_page(self, current_page):
elements = self.wait_until(None, '//div[starts-with(@class, "list-item reply-wrap")]',
self.FIND_ELEMENTS_BY_XPATH)
print(f'page {current_page} = {len(elements)}条')
for ele in elements:
try:
comment = self._get_one_comment(ele)
self.comments[f'page{current_page}'].append(comment)
except Exception as _:
raise
def _get_one_comment(self, ele):
user = self.wait_until(ele, 'div/div[@class="user"]/a', self.FIND_ELEMENT_BY_XPATH).text
comment_time = self.wait_until(ele, "div/div[@class='info']/span[@class='time']",
self.FIND_ELEMENT_BY_XPATH).text
content = self.wait_until(ele, 'div/p[@class="text"]', self.FIND_ELEMENT_BY_XPATH).text
comment = f'{user}于{comment_time}说: {content}'
return comment
if __name__ == '__main__':
with BilibiliSpider() as spider:
spider.driver_get_comments()
# spider = BilibiliSpider()
# spider.ajax_get_comment()
最后的结果存储在文件里, 效果如下:
后来我有发现:抓取的页数少了4页,似乎是因为翻页的时候太快跳过了几页,这个问题可以通过判断页面是否连续解决,我懒得再优化了。因为实际上用api接口读数据要简单也快速得多,但这次爬取经历对异常的处理很有帮助。
参考文献
[1] https://jeffknupp.com/blog/2016/03/07/improve-your-python-the-with-statement-and-context-managers/
[2] https://blog.csdn.net/u013250416/article/details/61425207
[3] https://cuiqingcai.com/2599.html
[4] 《python爬虫开发从入门到实战》(谢乾坤)
[5] https://www.seleniumhq.org/exceptions/stale_element_reference.jsp
[6] 董大6年前写的爬虫