知乎是一个真实的网络问答社区,社区氛围友好与理性,连接各行各业的精英。用户分享着彼此的专业知识、经验和见解,为中文互联网源源不断地提供高质量的信息。
准确地讲,知乎更像一个论坛:用户围绕着某一感兴趣的话题进行相关的讨论,同时可以关注兴趣一致的人。对于概念性的解释,网络百科几乎涵盖了你所有的疑问;但是对于发散思维的整合,却是知乎的一大特色。
为了膜拜“高学历、高收入、高消费”的大佬们学习,本鶸尝试用Scrapy
模拟登录并爬取知乎上的问题以及其回答。
模拟登录
在使用Scrapy
模拟登录之前,有过使用requests
模拟登录的经历,其中用session
和cookies
帮我节约了不少时间。
在使用到Scrapy
模拟登录时,需要使用到Scrapy
自己的Request
在模拟登录的过程中,首先需要修改Scrapy
默认的User-Agent
,并且向登录的URL POST
所需要的数据。通过查看页面和chrome开发者工具
中的network,可以得到我们需要POST
的URL以及数据。
headers={
"HOST":"www.zhihu.com",
"Referer":"https://www.zhihu.com",
"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:55.0) Gecko/20100101 Firefox/55.0"
}
Scrapy
默认的User-Agent
是无法爬取知乎这类有一定反爬虫的网站的,所以我们需要添加自己的headers
既然要模拟登录,需要向登录页面POST
的数据肯定是不能少的。
import re
account = input("请输入账号\n--->")
password = input("请输入密码\n--->")
_xsrf = response.xpath('/html/body/div[1]/div/div[2]/div[2]/form/input/@value').extract_first()
if re.match("^1\d{10}", account):
print("手机号码登录")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": _xsrf,
"phone_num": account,
"password": password,
"captcha":""
}
else:
if "@" in account:
# 判断用户名是否为邮箱
print("邮箱方式登录")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": _xsrf,
"email": account,
"password": password,
"captcha":""
}
通过正则表达式判断你输入的账号是手机号还是email
。知乎对账号登录POST
的地址会根据手机或email
会有不同。
-
_xsrf
是藏在登录页面中的一组随机密钥,可以使用正则或者Scrapy
自己的XPath或者CSS选择器从页面提取出来 -
captcha
就是验证码了。在登录时知乎会要求输入验证码。
具体模拟登录源码如下:
import scrapy
import re
from PIL import Image
import json
from urllib import parse
class ZhihuSpider(scrapy.Spider):
name = "zhihu"
allowed_domains=["www.zhihu.com"]
start_urls = ['https://www.zhihu.com/explore']
headers={
"HOST":"www.zhihu.com",
"Referer":"https://www.zhihu.com",
"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:55.0) Gecko/20100101 Firefox/55.0",
}
def parse(self, response):
pass
def start_requests(self):
#因为要登录后才能查看知乎,所以要重写入口
return [scrapy.Request("https://www.zhihu.com/#signin",headers=self.headers,callback=self.login)]
def login(self,response):
_xsrf = response.xpath('/html/body/div[1]/div/div[2]/div[2]/form/input/@value').extract_first()
account = input("请输入账号\n--->")
password = input("请输入密码\n--->")
if re.match("^1\d{10}", account):
print("手机号码登录")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": _xsrf,
"phone_num": account,
"password": password,
"captcha":""
}
else:
if "@" in account:
# 判断用户名是否为邮箱
print("邮箱方式登录")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": _xsrf,
"email": account,
"password": password,
"captcha":""
}
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
meta={"post_data": post_data,
"post_url": post_url,
},
callback=self.check_login
)]
def login_after_captcha(self,response):
#获取验证码
print(response.headers)
post_data = response.meta.get("post_data","")
post_url = response.meta.get("post_url","")
with open('captcha.gif', 'wb') as f:
f.write(response.body)
try:
im = Image.open("captcha.gif")
im.show()
captcha = input("please input the captcha:")
post_data["captcha"] = captcha
except:
print("未打开验证码文件")
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login,
)]
def check_login(self,response):
response_text = json.loads(response.body)
if response_text["r"] == 0:
headers = response.headers
cookie = dict(headers)[b'Set-Cookie']
cookie = [str(c, encoding="utf-8") for c in cookie]
cookies = ";".join(cookie)
#登录成功后才开始使用start_urls
for url in self.start_urls:
yield scrapy.Request(url,headers=self.headers,dont_filter=True)
else:
captcha_url = "https://www.zhihu.com/captcha.gif?&type=login"
#因为scrapy是一个异步框架,所以为了保证验证码在同一个session下,就将这个request yield出去
yield scrapy.Request(url=captcha_url,
headers=self.headers,
meta={"post_data":response.meta.get("post_data"),
"post_url":response.meta.get("post_url"),
},
callback=self.login_after_captcha)
登录后,整个知乎就在你眼前了。
数据的爬取
如何遍历一个网站的所有我们需要的网页?这是一个很麻烦的问题,一般会选择深度优先遍历(DFS)
或者广度优先遍历(BFS)
。我试着利用Scrapy
的异步机制,用DFS
一直跟踪、下载我所能接触到的URL,这样总会将所有我需要的URL遍历一次。
def parse(self, response):
"""
提取出check_login中yield中的URL即为我提取知乎URL的一个入口
将其中所有的URL中类似/question/xxxx的URL提取出来,然后下载后放入解析函数
:param response:
:return:
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x:True if x.startswith("https") else False,all_urls)
for url in all_urls:
print(url)
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*",url)
#如果提取到question的URL则进行下载
if match_obj:
request_url = match_obj.group(1)
question_id = match_obj.group(2)
yield scrapy.Request(request_url,
headers=self.headers,
meta={"question_id":question_id},
callback=self.parse_question)
# 如果提取到的不是question的URL,则进行跟踪
else:
yield scrapy.Request(url,headers=self.headers,callback=self.parse)
这样的找寻URL的逻辑在question页面也可以使用。将找到的形如/question/...
的URL交给专门处理question页面的函数进行处理。
from ..items import ZhihuAnswerItem
def parse_question(self,response):
"""
处理question页面,从页面中取出我们需要的item
:param response:
:return:
"""
question_id = response.meta.get("question_id")
if "QuestionHeader-title" in response.text:
#知乎的新版本
item_loader = ItemLoader(item=ZhihuQuestionItem(),response=response)
item_loader.add_css("title",".QuestionHeader-main .QuestionHeader-title::text")
item_loader.add_css("topics",".TopicLink .Popover div::text")
item_loader.add_css("content",".QuestionHeader-detail")
item_loader.add_value("url",response.url)
item_loader.add_value("zhihu_id",int(response.meta.get("question_id","")))
item_loader.add_css("answer_num",".List-headerText span::text")
item_loader.add_css("watch_user_num",'.NumberBoard-value::text')
item_loader.add_css("click_num",'.NumberBoard-value::text')
item_loader.add_css("comments_num",'.QuestionHeader-Comment button::text')
QuestionItem = item_loader.load_item()
#请求该问题的回答,这个URL会在后面给出。
yield scrapy.Request(self.start_answer_urls.format(question_id,20,0),headers=self.headers,callback=self.parse_answer)
yield QuestionItem
#在question页面中找question的URL.可有可无,主要是上面提取数据的逻辑
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
for url in all_urls:
print(url)
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
# 如果提取到question的URL则进行下载
if match_obj:
request_url = match_obj.group(1)
question_id = match_obj.group(2)
yield scrapy.Request(request_url,
headers=self.headers,
meta={"question_id": question_id},
callback=self.parse_question)
# 如果提取到的不是question的URL,则进行跟踪
else:
# pass
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
else:
#知乎的老版本
pass
知乎为我们开放了获取回答的一个公共信息的API。
点击之后,给我们展示的是一个json
里面会给我们很多有用的信息,比如
paging
里面的
-
is_end
是判断该页的回答是否是该问题最后的回答 -
totals
是显示该问题所有的回答 -
next
是爬取知乎回答最重要的一个数据。它算是我们爬取知乎问题的一个入口,它有三个重要的数据,question/xxxxxx/....
表明我们可以通过question_id
来找到该问题的回答;limit
即为每页回答的数量;offset
是偏移量,表示页面回答在所有回答中偏移位置。
后面的数据中可以看到许多我们需要的数据。(我随便开的一个json,不小心截图到谁了请找我。)
class ZhihuSpider(scrapy.Spider):
....
#answer第一页的请求URL
start_answer_urls = "http://www.zhihu.com/api/v4/questions/{0}/answers?" \
"sort_by=default&include=data%5B%2A%5D.is_normal%2Cadmin_closed_comment%2" \
"Creward_info%2Cis_collapsed%2Cannotation_action%2Cannotation_detail%2Ccollapse_reason%2" \
"Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2" \
"Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Ccreated_time%2Cupdated_time%2Creview_info%2" \
"Cquestion%2Cexcerpt%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2" \
"Cupvoted_followees%3Bdata%5B%2A%5D.mark_infos%5B%2A%5D.url%3Bdata%5B%2A%5D.author.follower_count%2Cbadge%" \
"5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}"
def parse_answer(self,response):
answer_json = json.loads(response.text)
is_end = answer_json["paging"]["is_end"]
total_anwsers = answer_json["paging"]["totals"]
next_url = answer_json["paging"]["next"]
AnswerItem = ZhihuAnswerItem()
#提取answer的结构
for answer in answer_json.get("data"):
AnswerItem["zhihu_id"] = answer["id"]
AnswerItem["url"] = answer["url"]
AnswerItem["question_id"] = answer["question"]["id"]
AnswerItem["author_id"] = answer["author"]["id"] if "id" in answer["author"] and answer["author"]["id"] is not "0" else None
AnswerItem["author_name"] = answer["author"]["name"] if "id" in answer["author"] and answer["author"]["id"] is not "0" else "匿名用户"
AnswerItem["content"] = answer["content"] if "content" in answer else None
AnswerItem["praise_num"] = answer["voteup_count"]
AnswerItem["comments_num"] = answer["comment_count"]
AnswerItem["update_time"] = answer["updated_time"]
AnswerItem["create_time"] = answer["created_time"]
AnswerItem["crawl_time"] = datetime.datetime.now()
yield AnswerItem
if not is_end:
yield scrapy.Request(next_url,headers=self.headers,callback=self.parse_answer)