最近一周在钻研如何利用新闻数据进行量化投资,在正式进行文本挖掘以及开发策略前,首当其冲的当然要准备好数据。“网络爬虫”、“数据抓取”这样字眼已经臭大街,说难不难,做精不易。如果忽略数据抓取代价的话,数据永远在那里,只要被爬网站服务器不删除数据,总会请求到数据。但是如果只是自以为是的觉得自己很厉害就错了,一般服务器都会有anti-spider,但是大部分情况下,anti-spider的需求是不能影响到网站正常使用,也就是一个网站的功能性需求一定高于反爬虫需求。
下面看一下怎么去爬取新浪网和每经网的上市公司新闻数据。
在爬取数据前,还是准备好数据库比较方便。这里我偏好非关系型数据库,优劣就不多说。这里我选择MongoDB,如果你习惯了可视化管理数据的方式,当然不能错过Robomongo。
接下来先看一下两个网站的页面结构:
单线程抓取速度肯定比不上多线程,但是协程抓取和多线程抓取上又不能完全分得出优劣。协程虽然是轻量级的线程,但到达一定数量后,仍然会造成服务器崩溃出错,比如下面这种“cannot watch more than 1024 sockets”的问题。最好的方法通过限制协程并发数量来解决此类问题。
多进程就更不用想了,占用内存大,启动时间特别漫长。新浪网响应速度还是杠杠的,同时一个页面的字节数也大,这也就意味着,在这种情况下,多线程比单线程的优势会明显很多。下面是爬取新浪上市公司历史新闻的代码:
# -*- coding: utf-8 -*-
"""
Created on Mon Jan 22 10:01:40 2018
@author: Damon
"
""
import time
import re
import requests
import gevent
from gevent import monkey,pool
monkey.patch_all()
from concurrent import futures
from bs4 import BeautifulSoup
from pymongo import MongoClient
classWebCrawlFromSina(object):
def__init__(self,*arg,**kwarg):
self.totalPages = arg[0] #totalPages
self.Range = arg[1] #Range
self.ThreadsNum = kwarg['ThreadsNum']
self.dbName = kwarg['dbName']
self.colName = kwarg['collectionName']
self.IP = kwarg['IP']
self.PORT = kwarg['PORT']
self.Porb = .5
defcountchn(self,string):
pattern = re.compile(u'[\u1100-\uFFFDh]+?')
result = pattern.findall(string)
chnnum = len(result)
possible = chnnum/len(str(string))
return (chnnum, possible)
defgetUrlInfo(self,url): #get body text and key words
respond = requests.get(url)
respond.encoding = BeautifulSoup(respond.content, "lxml").original_encoding
bs = BeautifulSoup(respond.text, "lxml")
meta_list = bs.find_all('meta')
span_list = bs.find_all('span')
part = bs.find_all('p')
article = ''
date = ''
summary = ''
keyWords = ''
stockCodeLst = ''
for meta in meta_list:
if 'name' in meta.attrs and meta['name'] == 'description':
summary = meta['content']
elif 'name' in meta.attrs and meta['name'] == 'keywords':
keyWords = meta['content']
if summary != '' and keyWords != '':
break
for span in span_list:
if 'class' in span.attrs:
if span['class'] == ['date'] or span['class'] == ['time-source']:
string = span.text.split()
for dt in string:
if dt.find('年') != -1:
date += dt.replace('年','-').replace('月','-').replace('日',' ')
elif dt.find(':') != -1:
date += dt
break
if 'id' in span.attrs and span['id'] == 'pub_date':
string = span.text.split()
for dt in string:
if dt.find('年') != -1:
date += dt.replace('年','-').replace('月','-').replace('日',' ')
elif dt.find(':') != -1:
date += dt
break
for span in span_list:
if 'id' in span.attrs and span['id'].find('stock_') != -1:
stockCodeLst += span['id'][8:] + ' '
for paragraph in part:
chnstatus = self.countchn(str(paragraph))
possible = chnstatus[1]
if possible > self.Porb:
article += str(paragraph)
while article.find('<') != -1 and article.find('>') != -1:
string = article[article.find('<'):article.find('>')+1]
article = article.replace(string,'')
while article.find('\u3000') != -1:
article = article.replace('\u3000','')
article = ' '.join(re.split(' +|\n+', article)).strip()
return summary, keyWords, date, stockCodeLst, article
defGenPagesLst(self):
PageLst = []
k = 1
while k+self.Range-1 <= self.totalPages:
PageLst.append((k,k+self.Range-1))
k += self.Range
if k+self.Range-1 < self.totalPages:
PageLst.append((k,self.totalPages))
return PageLst
defCrawlCompanyNews(self,startPage,endPage):
self.ConnDB()
AddressLst = self.extractData(['Address'])[0]
if AddressLst == []:
urls = []
url_Part_1 = 'http://roll.finance.sina.com.cn/finance/zq1/ssgs/index_'
url_Part_2 = '.shtml'
for pageId in range(startPage,endPage+1):
urls.append(url_Part_1 + str(pageId) + url_Part_2)
for url in urls:
print(url)
resp = requests.get(url)
resp.encoding = BeautifulSoup(resp.content, "lxml").original_encoding
bs = BeautifulSoup(resp.text, "lxml")
a_list = bs.find_all('a')
for a in a_list:
if 'href' in a.attrs and a.string and \
a['href'].find('http://finance.sina.com.cn/stock/s/') != -1:
summary, keyWords, date, stockCodeLst, article = self.getUrlInfo(a['href'])
while article == '' and self.Prob >= .1:
self.Prob -= .1
summary, keyWords, date, stockCodeLst, article = self.getUrlInfo(a['href'])
self.Prob =.5
if article != '':
data = {'Date' : date,
'Address' : a['href'],
'Title' : a.string,
'Keywords' : keyWords,
'Summary' : summary,
'Article' : article,
'RelevantStock' : stockCodeLst}
self._collection.insert_one(data)
else:
urls = []
url_Part_1 = 'http://roll.finance.sina.com.cn/finance/zq1/ssgs/index_'
url_Part_2 = '.shtml'
for pageId in range(startPage,endPage+1):
urls.append(url_Part_1 + str(pageId) + url_Part_2)
for url in urls:
print(' ', url)
resp = requests.get(url)
resp.encoding = BeautifulSoup(resp.content, "lxml").original_encoding
bs = BeautifulSoup(resp.text, "lxml")
a_list = bs.find_all('a')
for a in a_list:
if 'href' in a.attrs and a.string and \
a['href'].find('http://finance.sina.com.cn/stock/s/') != -1:
if a['href'] not in AddressLst:
summary, keyWords, date, stockCodeLst, article = self.getUrlInfo(a['href'])
while article == '' and self.Prob >= .1:
self.Prob -= .1
summary, keyWords, date, stockCodeLst, article = self.getUrlInfo(a['href'])
self.Prob =.5
if article != '':
data = {'Date' : date,
'Address' : a['href'],
'Title' : a.string,
'Keywords' : keyWords,
'Summary' : summary,
'Article' : article,
'RelevantStock' : stockCodeLst}
self._collection.insert_one(data)
defConnDB(self):
Conn = MongoClient(self.IP, self.PORT)
db = Conn[self.dbName]
self._collection = db.get_collection(self.colName)
defextractData(self,tag_list):
data = []
for tag in tag_list:
exec(tag + " = self._collection.distinct('" + tag + "')")
exec("data.append(" + tag + ")")
return data
defsingle_run(self):
page_ranges_lst = self.GenPagesLst()
for ind, page_range in enumerate(page_ranges_lst):
self.CrawlCompanyNews(page_range[0],page_range[1])
defcoroutine_run(self):
jobs = []
page_ranges_lst = self.GenPagesLst()
for page_range in page_ranges_lst:
jobs.append(gevent.spawn(self.CrawlCompanyNews,page_range[0],page_range[1]))
gevent.joinall(jobs)
defmulti_threads_run(self,**kwarg):
page_ranges_lst = self.GenPagesLst()
print(' Using ' + str(self.ThreadsNum) + ' threads for collecting news ... ')
with futures.ThreadPoolExecutor(max_workers=self.ThreadsNum) as executor:
future_to_url = {executor.submit(self.CrawlCompanyNews,page_range[0],page_range[1]) : \
ind for ind, page_range in enumerate(page_ranges_lst)}
if __name__ == '__main__':
t1 = time.time()
WebCrawl_Obj = WebCrawlFromSina(5000,100,ThreadsNum=4,IP="localhost",PORT=27017,\
dbName="Sina_Stock",collectionName="sina_news_company")
WebCrawl_Obj.coroutine_run() #Obj.single_run() #Obj.multi_threads_run()
t2 = time.time()
print(' running time:', t2 - t1)
countchn函数是用来统计中文比例的标签,超过self.Porb值(这里设置0.5)的标签则认为是正文(Article);getUrlInfo函数是用来获取页面信息,包括时间、地址、标题、关键词、摘要、正文以及相关股票代码;GenPagesLst函数是生成以tuple为元素的列表,每个元素=(开始抓取的页面下标,结束抓取页面的下标),这样是为了方便多线程抓取;single_run就是单线程跑的函数,coroutine_run是利用gevent库的协程抓取函数,multi_threads_run是利用futures库抓取的函数。因为在抓取的过程中很容易因为对方服务器中止连接而停止,或者很久都没有响应等情况,但是又不想在重启程序的时候无脑爬取冗余的数据,所以在启动的时候,可以先把数据库中的Address标签数据或者Date数据给获取下来,然后在插进新爬取数据前,先对比一下是否存在重复的Address或者Date,再选择是否要插进新数据。运行如下图:
在爬取每经网的时候出了点小叉子,重新抠页面的时候出现了各种连接中断的问题。即便是连接没问题,爬取下来的数据,很多只有标题,没有正文和时间。一开始以为是自己写的代码没抓取到,后来才发现是被反爬了。
所以在代码中得分别记录成功抓取以及未成功抓取的url,然后当然是持续不断的继续抓取,直到革命胜利。这种持续不断的访问同一个链接的循环,很容易会被对方的服务器锁死IP,如果真这样嗝屁了,得换IP玩了。所以每循环一定次数,最好就sleep一定时间,当然如果觉得不麻烦,sleep一个随机数的间隔(看起来比较人为的样子),再继续抓取。我这里直接就sleep个1秒钟继续爬。
一开始先多线程调用CrawlCompanyNews函数,然后统计一下返回一个没有抓取到信息的url_lst_withoutNews,再传进ReCrawlNews函数,单线程重新逐个抓取。最后抓取的样子如下图: