好久没更新博客了,最近在做一个知乎的小爬虫,基于springboot+myabtis+webmagic
webmagic是一个简单灵活的Java爬虫框架。基于WebMagic,支持多线程爬取,爬取逻辑明确、是一个易维护的爬虫。
官方给出的流程图是像下面这样的:
-
Downloader
代表负责从互联网上下载页面,以便后续处理 -
PageProcessor
相当于将一个网页与其他页面相同的标签逻辑抽取出来,并将其解析加入一个Page对象当中 -
Scheduler
主要是负责管理待抓取的URL,以及一些去重的工作 -
Pipeline
是将PageProcessor
中抽取的逻辑放入爬取队列,包括计算、持久化到文件、数据库等。WebMagic默认提供了“输出到控制台”和“保存到文件”两种结果处理方案。
Pipeline
定义了结果保存的方式,如果你要保存到指定数据库,则需要编写对应的Pipeline
。对于一类需求一般只需编写一个Pipeline
。
项目结构
![project.png](https://upload-images.jianshu.io/
/5309010-3636e4e84fbe07b6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
定制DownLoader
:HttpClientDownloaderExtend
-
DownLoader
实现思路
由于webmagic
继承于HttpDownLoader
的下载器,所以我们需要先对HttpDownLoader
的继承关系先做一下简要说明:
httpDownLoader
的设计思路是为了方便多线程爬取,为了明确这个思路,让我们看看在其所继承的AbstractDownLoader
当中一个请求是如何在下载时进行传递的呢?
-
工作流程:发出请求-下载-处理响应
- 当我们发出一个获取页面的请求时,需要先建立一个
httpClient
对象,然后利用双重锁检查httpclient
实例是否被上一个线程获取到锁提前释放,如果当前httpclient
获取到锁但未申请实例,则认为已经处于可关闭的状态,加入关闭连接集合 - 当设置好当前
httpclient
的连接属性之后,我们通过handlResponse
方法返回处理之后的Page
对象,这个Page
对象包含了页面的全部信息,包括标签,带爬取的页面集合 - 在发出请求和返回请求的流程中,
download
方法即是将请求中需要发送的uri
进行限定,然后将请求交给handleResponse
返回Page
对象
- 当我们发出一个获取页面的请求时,需要先建立一个
需求分析:
我们需要设置例如像https://github.com/code4craft
这样的连接,但是如果我们需要匹配到具体的项目链接,修改正则显得比较麻烦。假设需要获取https://github.com/code4craft/webmagic
这样的链接,通过httpDownloader
设置需要拓展的连接后缀/webmagic
可以达到我们想要的效果
package com.complone.zhihumagic.downloader;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.downloader.AbstractDownloader;
import us.codecraft.webmagic.downloader.HttpClientGenerator;
import us.codecraft.webmagic.selector.PlainText;
import us.codecraft.webmagic.utils.HttpConstant;
import us.codecraft.webmagic.utils.UrlUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @Description 拓展了 HttpClientDownloader
* 允许在下载的时候 对url 进行再加工处理
* @author complone
*/
public class HttpClientDownloaderExtend extends AbstractDownloader {
private Logger logger = LoggerFactory.getLogger(getClass());
private final Map<String, CloseableHttpClient> httpClients = new HashMap<String, CloseableHttpClient>();
private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();
private String urlExtend;
//允许对从队列里面获取的url 进行处理
public HttpClientDownloaderExtend(String urlExtend){
this.urlExtend = urlExtend;
}
private CloseableHttpClient getHttpClient(Site site) {
if (site == null) {
return httpClientGenerator.getClient(null);
}
String domain = site.getDomain();
CloseableHttpClient httpClient = httpClients.get(domain);
if (httpClient == null) {
synchronized (this) {
httpClient = httpClients.get(domain);
if (httpClient == null) {
httpClient = httpClientGenerator.getClient(site);
httpClients.put(domain, httpClient);
}
}
}
return httpClient;
}
@Override
public Page download(Request request, Task task) {
Site site = null;
if (task != null) {
site = task.getSite();
}
Set<Integer> acceptStatCode;
String charset = null;
Map<String, String> headers = null;
if (site != null) {
acceptStatCode = site.getAcceptStatCode();
charset = site.getCharset();
headers = site.getHeaders();
} else {
acceptStatCode = Sets.newHashSet(200);
}
request.setUrl(request.getUrl()+urlExtend);
logger.info("downloading page {}", request.getUrl());
CloseableHttpResponse httpResponse = null;
int statusCode=0;
try {
HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers);
httpResponse = getHttpClient(site).execute(httpUriRequest);
statusCode = httpResponse.getStatusLine().getStatusCode();
request.putExtra(Request.STATUS_CODE, statusCode);
if (statusAccept(acceptStatCode, statusCode)) {
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}
} catch (IOException e) {
logger.warn("download page " + request.getUrl() + " error", e);
if (site.getCycleRetryTimes() > 0) {
return addToCycleRetry(request, site);
}
onError(request);
return null;
} finally {
request.putExtra(Request.STATUS_CODE, statusCode);
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail", e);
}
}
}
@Override
public void setThread(int thread) {
httpClientGenerator.setPoolSize(thread);
}
protected boolean statusAccept(Set<Integer> acceptStatCode, int statusCode) {
return acceptStatCode.contains(statusCode);
}
protected HttpUriRequest getHttpUriRequest(Request request, Site site, Map<String, String> headers) {
RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
if (headers != null) {
for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
}
}
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectionRequestTimeout(site.getTimeOut())
.setSocketTimeout(site.getTimeOut())
.setConnectTimeout(site.getTimeOut())
.setCookieSpec(CookieSpecs.BEST_MATCH);
if (site.getHttpProxyPool().isEnable()) {
HttpHost host = site.getHttpProxyFromPool();
requestConfigBuilder.setProxy(host);
request.putExtra(Request.PROXY, host);
}
requestBuilder.setConfig(requestConfigBuilder.build());
return requestBuilder.build();
}
protected RequestBuilder selectRequestMethod(Request request) {
String method = request.getMethod();
if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
//default get
return RequestBuilder.get();
} else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
RequestBuilder requestBuilder = RequestBuilder.post();
NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
if (nameValuePair.length > 0) {
requestBuilder.addParameters(nameValuePair);
}
return requestBuilder;
} else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
return RequestBuilder.head();
} else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
return RequestBuilder.put();
} else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
return RequestBuilder.delete();
} else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
return RequestBuilder.trace();
}
throw new IllegalArgumentException("Illegal HTTP Method " + method);
}
protected Page handleResponse(Request request, String charset, HttpResponse httpResponse, Task task) throws IOException {
String content = getContent(charset, httpResponse);
Page page = new Page();
page.setRawText(content);
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
return page;
}
protected String getContent(String charset, HttpResponse httpResponse) throws IOException {
if (charset == null) {
byte[] contentBytes = IOUtils.toByteArray(httpResponse.getEntity().getContent());
String htmlCharset = getHtmlCharset(httpResponse, contentBytes);
if (htmlCharset != null) {
return new String(contentBytes, htmlCharset);
} else {
logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
return new String(contentBytes);
}
} else {
return IOUtils.toString(httpResponse.getEntity().getContent(), charset);
}
}
protected String getHtmlCharset(HttpResponse httpResponse, byte[] contentBytes) throws IOException {
String charset;
// charset
// 1、encoding in http header Content-Type
String value = httpResponse.getEntity().getContentType().getValue();
charset = UrlUtils.getCharset(value);
if (StringUtils.isNotBlank(charset)) {
logger.debug("Auto get charset: {}", charset);
return charset;
}
// use default charset to decode first time
Charset defaultCharset = Charset.defaultCharset();
String content = new String(contentBytes, defaultCharset.name());
// 2、charset in meta
if (StringUtils.isNotEmpty(content)) {
Document document = Jsoup.parse(content);
Elements links = document.select("meta");
for (Element link : links) {
// 2.1、html4.01 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
String metaContent = link.attr("content");
String metaCharset = link.attr("charset");
if (metaContent.indexOf("charset") != -1) {
metaContent = metaContent.substring(metaContent.indexOf("charset"), metaContent.length());
charset = metaContent.split("=")[1];
break;
}
// 2.2、html5 <meta charset="UTF-8" />
else if (StringUtils.isNotEmpty(metaCharset)) {
charset = metaCharset;
break;
}
}
}
logger.debug("Auto get charset: {}", charset);
// 3、todo use tools as cpdetector for content decode
return charset;
}
}
保存用户信息实体类GithubUserInfo
ps:这里用了注解模式,在webamgic当中有一种存储队列叫做PageModel<T>
有兴趣的朋友可以查阅文档配合使用
import us.codecraft.webmagic.model.annotation.ExtractBy;
import us.codecraft.webmagic.model.annotation.HelpUrl;
import us.codecraft.webmagic.model.annotation.TargetUrl;
import javax.persistence.Column;
import javax.persistence.Id;
/**
* Created by complone on 2018/11/2.
*/
@TargetUrl("https://github.com/\\w+/\\w+")
@HelpUrl("https://github.com/\\w+")
public class GithubUserInfo {
@Id
@Column(name = "g_id")
private Integer githubId;
@Column(name = "nickname")
@ExtractBy(value = "//h1[@class='vcard-names']/span[2]/text()")
private String nickname;
@Column(name = "author")
@ExtractBy(value = "//h1[@class='vcard-names']/span[2]/text()")
private String author;
public void setAuthor(String author) {
this.author = author;
}
public void setGithubId(Integer githubId) {
this.githubId = githubId;
}
public void setNickname(String nickname) {
this.nickname = nickname;
}
public Integer getGithubId() {
return githubId;
}
public String getAuthor() {
return author;
}
public String getNickname() {
return nickname;
}
}
定制PageProcessor
:GithubProcessor
我们先尝试爬取Github
上作者的连接进行测试
- 在工作流程中
PageProcessor
是负责抽取爬取逻辑并将其保存至一个持久化对象的组件 - 需求分析:爬取相关的页面标签,结果集的作者字段为空则进入下一个页面进行爬取,并将爬取成功的结果存入持久化对象
ps: 在webamgic
的工作流程中所有的爬取组件都应该作为一个bean
对象为spring
所管理,所以@Component
需要在每个定制的组件上加入(至于为什么不能直接在processor
中启动爬虫,后续再下面会讲)
package com.complone.zhihumagic.processor;
import com.complone.zhihumagic.downloader.HttpClientDownloaderExtend;
import com.complone.zhihumagic.mapper.GithubUserInfoMapper;
import com.complone.zhihumagic.model.GithubUserInfo;
import com.complone.zhihumagic.pipeline.GithubUserPipeline;
import com.complone.zhihumagic.service.GithubUserService;
import org.apache.http.HttpHost;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.model.OOSpider;
import us.codecraft.webmagic.pipeline.Pipeline;
import us.codecraft.webmagic.processor.PageProcessor;
@Component
public class GitHubProcessor implements PageProcessor {
@Autowired
private GithubUserService githubUserService;
private static final String start_url = "https://github.com/code4craft";
// 部分一:抓取网站的相关配置,包括编码、抓取间隔、重试次数等
private Site site = Site.me().setRetryTimes(3).setSleepTime(1000);
// .setHttpProxy(new HttpHost("45.32.50.126",4399));
GithubUserInfo githubUserInfo = new GithubUserInfo();
@Override
// process是定制爬虫逻辑的核心接口,在这里编写抽取逻辑
public void process(Page page) {
// 部分二:定义如何抽取页面信息,并保存下来
page.putField("author", page.getHtml().xpath("//h1[@class='vcard-names']/span[2]/text()").toString());
page.putField("name", page.getHtml().xpath("//h1[@class='vcard-names']/span[1]/text()").toString());
page.putField("readme", page.getHtml().xpath("//div[@id='readme']/tidyText()"));
if (page.getResultItems().get("name") == null) {
//skip this page
page.setSkip(true);
}
githubUserInfo.setAuthor(page.getHtml().xpath("//h1[@class='vcard-names']/span[2]/text()").toString());
githubUserInfo.setNickname(page.getHtml().xpath("//h1[@class='vcard-names']/span[1]/text()").toString());
System.out.println(githubUserInfo.getNickname() + " ------------------ "+githubUserInfo.getAuthor());
// 部分三:从页面发现后续的url地址来抓取
page.addTargetRequests(page.getHtml().links().regex("(https://github\\.com/[\\w-]+)").all());
page.putField("githubUserInfo",githubUserInfo);
// githubUserService.insertGithubUserInfo(githubUserInfo);
}
@Override
public Site getSite() {
return site;
}
public void start(PageProcessor pageProcessor, Pipeline pipeline) {
Spider.create(pageProcessor).addUrl(start_url).addPipeline(pipeline).thread(5).run();
}
public static void main(String[] args) {
Spider spider = Spider.create(new GitHubProcessor())
.addUrl(start_url)
.addPipeline(new GithubUserPipeline())
//从"https://github.com/code4craft"开始抓
//开启5个线程抓取
.thread(5);
spider.run();
}
}
定制Pipline
: GithubUserPipline
Pipeline
在工作流程中作为一个将之前持久化对象导出的组件,此时我们需要将其导出到mysql
需求分析:在
process
方法中将持久化对象检查是否存在author
为空的情况,避免https://github.com/topic
这样的github
模块存储到数据库,同时在多个线程进行爬取时,记录当前爬取的数据条数
package com.complone.zhihumagic.pipeline;
import com.complone.zhihumagic.mapper.GithubUserInfoMapper;
import com.complone.zhihumagic.model.GithubUserInfo;
import com.complone.zhihumagic.service.GithubUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;
import java.util.Map;
/**
* Created by complone on 2018/11/2.
*/
@Component("githubUserPipeline")
public class GithubUserPipeline implements Pipeline{
@Autowired
private GithubUserService githubUserService;
private volatile int count = 0;
@Override
public void process(ResultItems resultItems, Task task) {
GithubUserInfo githubUserInfo = resultItems.get("githubUserInfo");
for (Map.Entry<String, Object> entry : resultItems.getAll().entrySet()) {
System.out.println(entry.getKey() + ":\t" + entry.getValue());
if (entry.getKey() == "author"&& resultItems.get("author")==null){
//防止github可能爬取到topic之类的链接,防止数据行为空
continue;
}
}
githubUserService.insertGithubUserInfo(githubUserInfo);
count++;
System.out.println("已经插入第"+count+"条数据");
}
}
启动爬虫控制器:Basecontroller
将GithubPageProcessor
与GithubPipeline
作为bean
组件传入爬虫,避免在bean
未注册完成时存入数据库
package com.complone.zhihumagic.controller;
import com.alibaba.fastjson.JSON;
import com.complone.zhihumagic.mapper.GithubUserInfoMapper;
import com.complone.zhihumagic.mapper.UserDetailInfoMapper;
import com.complone.zhihumagic.model.GithubUserInfo;
import com.complone.zhihumagic.model.UserBaseInfo;
import com.complone.zhihumagic.model.UserDetailInfo;
import com.complone.zhihumagic.pipeline.GithubUserPipeline;
import com.complone.zhihumagic.processor.GitHubProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import tk.mybatis.mapper.entity.Example;
import java.util.List;
@Controller
public class BaseController {
@Autowired
private UserDetailInfoMapper userDetailInfoMapper;
@Autowired
private GithubUserInfoMapper githubUserInfoMapper;
@RequestMapping(value = "/searchByName")
public @ResponseBody
List<UserDetailInfo> searchByName(@RequestParam(value = "name", required = true)String name){
Example example1 = new Example(UserBaseInfo.class);
example1.selectProperties("nickname","location","weiboUrl","headline","description");
example1.createCriteria().andLike("nickname", name);
List<UserDetailInfo> result = (List<UserDetailInfo>) userDetailInfoMapper.selectByExample(example1);
System.out.println("查找昵称为"+name+"结果为 "+JSON.toJSONString(result));
return result;
}
@RequestMapping(value = "/test",method = RequestMethod.GET)
public @ResponseBody int test(){
UserDetailInfo ui = new UserDetailInfo();
ui.setPageurl("https://www.geo43.com");
ui.setNickname("v2ex");
int row = userDetailInfoMapper.insertOne(ui);
return row;
}
@RequestMapping(value = "/desc",method = RequestMethod.POST)
public @ResponseBody int testGIthub(){
GithubUserInfo githubUserInfo = new GithubUserInfo();
githubUserInfo.setNickname("nacy");
githubUserInfo.setAuthor("complone");
int row = githubUserInfoMapper.insertGithubUserInfo(githubUserInfo);
return row;
}
@Autowired
private GitHubProcessor gitHubProcessor;
@Autowired
private GithubUserPipeline githubUserPipeline;
@RequestMapping("/start")
public String start() {
gitHubProcessor.start(gitHubProcessor,githubUserPipeline);
return "GithubSpider is close!";
}
}
保存多线程抽取逻辑时出现的并发任务,参考流水线操作SaveTask
。
需要开启事务@Transactional
显示设置需要的提交模式
package com.complone.zhihumagic;
import com.complone.zhihumagic.mapper.UserDetailInfoMapper;
import com.complone.zhihumagic.model.UserDetailInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
public class SavingTask implements Runnable {
Logger logger = LoggerFactory.getLogger(SavingTask.class);
@Autowired
private UserDetailInfoMapper userDetailInfoMapper;
private BlockingQueue<UserDetailInfo> blockingDeque; //新建一个阻塞队列
private volatile boolean isStop = false; //标记多线程征用资源时,锁是否得到释放
private volatile int i =0; //多线程存储次数计数器
public UserDetailInfoMapper getUserDetailInfoMapper() {
return userDetailInfoMapper;
}
public void setUserDetailInfoMapper(UserDetailInfoMapper userDetailInfoMapper) {
this.userDetailInfoMapper = userDetailInfoMapper;
}
public SavingTask(BlockingDeque<UserDetailInfo> blockingDeque) {
this.blockingDeque = blockingDeque;
}
@Override
public void run() {
while (true) {
if (isStop) { //线程标记符,判断是否终止
return;
}
UserDetailInfo userDetailInfo = blockingDeque.poll(); //获取进入阻塞队列的对象
if (userDetailInfo == null) {
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
synchronized (this){ //分离写对象操作,加锁防止线程争用
try{
userDetailInfoMapper.insertSelective(userDetailInfo);
logger.info("-------------存贮了:{}------------",++i);
}catch (Exception e){
logger.error("-------出现问题------{}---{}",e,userDetailInfo );
}
}
}
}
}
public void startSave() {
this.isStop = true;
}
public void stopSave() {
this.isStop = true;
}
}
----------------------------- 分割线 ,今日待更------------------------------------------