1、通过scrapy shell调试
在使用shell调试时,直接通过
scrapy shell https://www.zhihu.com/question/58765535
会出现500错误。这是因为没有加headers的原因。
正确的方法是:
scrapy shell -s USER_AGENT="Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWe bKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" www.zhihu.com/question/58765535
此时,就可以在shell中进行分析了。
2、获得要分析的链接
登录完成,打开首页后,通过尝试优先算法获得首页需要的链接,然后打开这些链接再次获得里面的链接,不断重复,获得所有内容。
from urllib import parse
def parse(self, response):
# 因为没有具体的入口,采用深度优先的算法
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
for url in all_urls:
pass
通过调试,里面含javascript的链接是需要去掉的链接。
# -*- coding: utf-8 -*-
import scrapy
import re
import json
from urllib import parse
from scrapy.loader import ItemLoader
from articlespider.items import ZhihuQuestionItem, ZhihuAnswerItem
import datetime
class ZhihuSpider(scrapy.Spider):
name = "zhihu"
allowed_domains = ["www.zhihu.com"]
start_urls = ['http://www.zhihu.com/']
start_answer_url = 'https://www.zhihu.com/api/v4/questions/{0}/answers?sort_by=default&include=data%5B%2A%5D.is_normal%2Cis_sticky%2Ccollapsed_by%2Csuggest_edit%2Ccomment_count%2Ccan_comment%2Ccontent%2Ceditable_content%2Cvoteup_count%2Creshipment_settings%2Ccomment_permission%2Cmark_infos%2Ccreated_time%2Cupdated_time%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%2Cupvoted_followees%3Bdata%5B%2A%5D.author.badge%5B%3F%28type%3Dbest_answerer%29%5D.topics&limit={1}&offset={2}'
headers = {
"Accept": 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
"Host": 'www.zhihu.com',
"User-Agent": 'Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',
}
def parse(self, response):
# 因为没有具体的入口,采用尝试优先的算法
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls = filter(lambda x: True if x.startswith('https') else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
request_url = match_obj.group(1)
question_id = match_obj.group(2)
# 如果得到question的相关页面,则进行下载
yield scrapy.Request(request_url, headers=self.headers, meta={"question_id":question_id}, callback=self.parse_question)
else:
# 非question相关页面,解析里面的链接。
pass
# yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def parse_question(self, response):
#处理question页面, 从页面中提取出具体的question item
if "QuestionHeader-title" in response.text:
#处理新版本
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
else:
#处理老版本页面的item提取
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text")
question_item = item_loader.load_item()
yield question_item
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
def parse_answer(self, response):
ans_json = json.loads(response.text)
is_end = ans_json['paging']['is_end']
next_url = ans_json['paging']['next']
for answer in ans_json['data']:
answer_item = ZhihuAnswerItem()
answer_item['zhihu_id'] = answer['id']
answer_item['url'] = answer['url']
answer_item['question_id'] = answer['question']['id']
answer_item['author_id'] = answer['author']['id'] if 'id' in answer['author'] else ''
answer_item['content'] = answer['editable_content'] if 'editable_content' in answer else answer['excerpt']
answer_item['praise_num'] = answer['voteup_count']
answer_item['comments_num'] = answer['voteup_count']
answer_item['create_time'] = answer['created_time']
answer_item['update_time'] = answer['updated_time']
answer_item['crawl_time'] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers,callback=self.parse_answer)
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
def login(self, response):
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*)"/>', response_text, re.S)
xsrf = ''
if match_obj:
xsrf = match_obj.group(1)
if xsrf:
post_data = {
"_xsrf": xsrf,
"email": 'sofeng@gmail.com',
"password": '7393181'
}
import time
t = str(int(time.time()*1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={}&type=login".format(t)
yield scrapy.Request(captcha_url, headers=self.headers, meta={'post_data': post_data}, callback=self.login_after_capthcha)
def login_after_capthcha(self, response):
with open('captpcha.jpg', 'wb') as f:
f.write(response.body)
f.close()
from PIL import Image
try:
im = Image.open('captpcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("输入验证码")
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/email"
post_data['captcha'] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
def check_login(self, response):
# 验证服务器返回数据判断是否成功
text_json = json.loads(response.text)
if 'msg' in text_json and text_json["msg"] == "登录成功":
for url in self.start_urls: # 从继承的Spider类中拿的内容,恢复到正确执行。
yield scrapy.Request(url, dont_filter=True, headers=self.headers)
items.py
class ZhihuQuestionItem(scrapy.Item):
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
"""
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
class ZhihuAnswerItem(scrapy.Item):
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
praise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, praise_num, comments_num,
create_time, update_time, crawl_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), praise_num=VALUES(praise_num), comments_num=VALUES(comments_num)
"""
create_time = datetime.datetime.fromtimestamp(self['create_time']).strftime(SQL_DATE_FORMAT)
update_time = datetime.datetime.fromtimestamp(self['update_time']).strftime(SQL_DATE_FORMAT)
params = (
self['zhihu_id'],self['url'], self['question_id'], self['author_id'],
self['content'],self['praise_num'], self['comments_num'], create_time,
update_time,self['crawl_time'].strftime(SQL_DATE_FORMAT)
)
return insert_sql, params
pipelines.py
class MysqlyibuPipeline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
# 使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider)
def handle_error(self, failure, item, spider):
# 处理异步插入的异常
print (failure)
def do_insert(self, cursor, item):
# 具体执行插入
# insert_sql = 'INSERT INTO jobbole_article (`title`, `create_date`, `url`, `url_object_id`, `content`, `front_image_path`, `comment_nums`, `fav_nums`, `praise_nums`, `tags`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
# cursor.execute(insert_sql, (item['title'], item['create_date'], item['url'], item['url_object_id'], item['content'], item["front_image_path"], item['comment_nums'], item['fav_nums'], item['praise_nums'], item['tags']))
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
为了程序的通用性,把sql语句的插入变为
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
所有的语句变为在items.py中处理。