爬虫究竟是怎样开始的?
爬虫究竟是怎样开始的?这个问题是一个很难的哲学问题。如果看官对技术术语更敏感,那么可以表述为,爬虫,严格说爬虫的scheduler,如果需要保持一个status,到底是poll还是push?
本文将讨论并解决这个问题。(注:不说人话)
从“一次性”爬虫开始说起
上图为最简单的一次爬虫过程。究其本质,这个过程的核心是一个"种子页面"->"目标页面"->"目标页面链接"->"目标页面"的循环。也就是说,给定若干种子页面,和一些目标页面链接的匹配规则Rule,一定可以通过不断循环,把这个网站Site上所有被覆盖到的、符合匹配规则的页面集合P={page ∈ Site|Rule}获取到。这个P是一个有穷集合。P获取到了,爬虫结束。
讨论到这,大概勉强达到大学计算机专业本科期末作业的水平。但是想要及格,或许还应该考虑这么几个方面:
- 数据库:这个话题能截好多图:)
- 多线程:P的数量如果很大,我需要把“下载网页、保存数据库”这样的动作放进多线程里跑跑。
- 前端队列:我需要一个前端队列!“生产者-消费者”模式,呵呵。
- 历史队列:是的,我还需要一个历史队列!不然爬重复了怎么办?
- Javascript:淘宝的网页怎么爬?
- 反爬:这是一个long story,甚至比人类的历史还要久远。
Done。至此我们已经实现了一个异常牛逼的爬虫程序。如果想把“程序”变成“系统”,则需要考虑更多工程上的东西:如何分布式。
稳定性迁移
分布式的终极目的,是要把单机程序(进程)里任何不稳定因素,通过迁移到外部更稳定的程序(进程)的方式,达到系统全局更稳定的目的。另外一方面,通过这几年被“微服务”概念不断地洗脑,工程人员最喜欢用的一个词是“解耦合”。
更进一步,如果我们希望我们的爬虫是一个“永远在线”的服务(引擎),“爬某一个网站的网页”这样的事情被当成某一个任务(task),随时启动、暂停、停止,这不得不让我们重新在架构上考虑得更多。
document-service
之前讨论的几个方面中,最应该第一个抽取出来的服务就是存储服务。无论选择使用MongoDB还是ES来保存网页,对外屏蔽底层的存储方案无疑是个优雅的idea。对外透出save()和saveBatch()操作。
queue-service
抽取出“前端队列”和“历史队列”作为队列服务。如果你将Redis作为选型方案,那么“前端队列”是一个List,支持类似fetch(30)这样的操作;“历史队列”则是一个Set,支持hasVisit()和visit(link)这样的操作。
downloader-service
作为高阶玩家,抽离出网页下载服务也是有必要的。URL进,Document出。downloader对外屏蔽了诸如“异常重试”、“中文编码自适应”、“代理IP池维护和切换”、“Headless浏览器渲染”、“http连接池复用”等一切跟网络IO有关的细节。
sql-service
如前所述,爬取的工作被当成任务来执行。每一项任务自身有着各种meta信息,启动一个任务的示例(instance)也关联运行时的meta信息,这些信息保存在关系数据库中并对外透出CRUD接口,在此不做赘述。
回到正题:scheduler
前文所述的各种service作为工具一样的组件封装完备后,为了让整个爬虫run起来,接下来该讨论整个爬虫引擎中最核心的scheduler-worker问题了。
Master-Worker模式的分布式框架,从Doug Cutting写下Hadoop的第一行代码开始逐渐深入人心。很不幸的是我们并不能从中获得太多启发。
先说worker。由于爬虫引擎是“永远在线”的,那么worker(一个独立的进程)也是永远在线的。因此我们想到了push方案:
- worker-push方案。即,worker们在启动时,把自身注册到scheduler中。scheduler中维护了worker们的通讯ip列表,当有任务启动时,scheduler在列表中随机挑选worker,并在queue-service中fetch(N)条待爬取的URLs,然后post给worker去抓取。
好的,开始自我挑战吧。push最大的一个毛病,就是scheduler需要与worker建立了直接的通信并时刻测试通信。不要试图从Hadoop中找方案,凭直觉我们又想到了poll方案:
- worker-poll方案。即,worker保持一个while/true循环(sleeps may be),不断的看看queue-service里有没有要爬的网页。没有就continue,有就该干啥干啥。当然,worker们监听一个消息队列也是可取的,这样queue-service就不得不做一些改造。
Worker以poll的方式监听消息队列自然是一个省事的好方式,但scheduler侧该如何设计呢?先看push方案:
- scheduler-push方案。worker保持一个while/true循环(sleeps may be),不断的看看种子列表(或队列)里有没有要爬的网页。没有就continue,有就该干啥干啥。
这样做的好处是:首先容易实现,逻辑简单明了,还可以动态调整sleep的时间,我靠太牛逼了!当然坏处是:不管怎么动态调整sleep的时间,始终是有滞后和开销的矛盾。不太符合对实时性有要求的场景,说好的“事件驱动”呢?
屌,我们终于说到“事件驱动了”。我们现在要解决的问题是,如何感知到并且以最小的开销让任务启动?那么下面提供一个push+pull的方案,也是本文作为一篇“记叙文”的中心思想,权当抛砖引玉:
- scheduler-push+pull方案。即,scheduler以较快频率(30秒)poll种子列表的返回数据(网页)是否有变动,如没有变动continue,如有变动解析出任务URL,作为事件push到消息队列中待worker爬取。
再接再厉,现在似乎剩下的最后一个问题就是,如何以最小开销检测种子列表里的网页是否有变动呢?我们联想到了浏览器+刷新的方式。基于开源框架selenium,甚至可以很容易实现对于“网页局部元素是否有更新”这种监听的动作。
show me the code
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.reactivex.schedulers.Schedulers;
import jodd.util.StringUtil;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.openqa.selenium.By;
import org.openqa.selenium.JavascriptExecutor;
import org.openqa.selenium.PageLoadStrategy;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.chrome.ChromeOptions;
import org.openqa.selenium.support.ui.ExpectedConditions;
import org.openqa.selenium.support.ui.WebDriverWait;
import utils.MD5;
/**
* @author craig
* @since 2018年6月21日 上午10:50:54
*/
public class SiteMonitor {
private WebDriver driver;
private Map<SiteBean, Integer> siteBeanMap;
private Map<SiteBean, String> siteTokenMap;
/**
*
*/
public SiteMonitor(String chromePath) {
System.setProperty("webdriver.chrome.driver", chromePath);
ChromeOptions co = new ChromeOptions();
co.setPageLoadStrategy(PageLoadStrategy.NORMAL);
co.setHeadless(true);
driver = new ChromeDriver(co);
siteBeanMap = Maps.newHashMap();
siteTokenMap = Maps.newHashMap();
}
/**
* @throws InterruptedException
*
*/
public Document openInNewTab(WebDriver webDriver, SiteBean siteBean) throws InterruptedException {
List<String> tabs = Lists.newArrayList(driver.getWindowHandles());
((JavascriptExecutor) driver).executeScript("window.open('about:blank','_blank');");
tabs = Lists.newArrayList(driver.getWindowHandles());
siteBeanMap.put(siteBean, tabs.size() - 1);
siteTokenMap.put(siteBean, "");
driver.switchTo().window(tabs.get(tabs.size() - 1));
driver.navigate().to(siteBean.getSiteURL());
return Jsoup.parse(driver.getPageSource(), siteBean.getHost());
}
/**
* @throws InterruptedException
*
*/
public void monitoring(List<SiteBean> siteBeanList) throws InterruptedException {
for (int i = 0; i < siteBeanList.size(); i++) {
openInNewTab(driver, siteBeanList.get(i));
}
Schedulers.trampoline().createWorker().schedulePeriodically(new Runnable() {
@Override
public void run() {
for (int i = 0; i < siteBeanList.size(); i++) {
SiteBean sb = siteBeanList.get(i);
driver.switchTo().window(Lists.newArrayList(driver.getWindowHandles()).get(siteBeanMap.get(sb)));
WebElement newContent = new WebDriverWait(driver, 60)
.until(ExpectedConditions.presenceOfElementLocated(By.cssSelector(sb.getElementLocated())));
String newToken = MD5.getMD5(newContent.getText());
if (!StringUtil.equals(siteTokenMap.get(sb), newToken)) {
siteTokenMap.put(sb, newToken);
String html = newContent.getAttribute("outerHTML");
Document doc = Jsoup.parseBodyFragment(html);
System.out.println(sb.getSiteName() + "有更新:" + doc.text());
System.out.println("~~~~~~~~~~~~~~~~~~~~~~update!~~~~~~~~~~~~~~~~~~`");
} else {
System.out.println(sb.getSiteName() + "无更新");
}
}
}
}, 0, 1, TimeUnit.MINUTES);
}
public static void main(String[] args) throws Exception {
SiteMonitor sm = new SiteMonitor("/Users/craig/chromedriver");
//
List<SiteBean> siteBeanList = Lists.newArrayList();
SiteBean sb = new SiteBean();
sb.setSiteName("财经");
sb.setElementLocated("#instantPanel");
sb.setHost("163.com");
sb.setSiteURL("http://money.163.com/latest/");
siteBeanList.add(sb);
SiteBean sb2 = new SiteBean();
sb2.setSiteName("体育");
sb2.setElementLocated("#instantPanel");
sb2.setHost("163.com");
sb2.setSiteURL("http://sports.163.com/latest");
siteBeanList.add(sb2);
sm.monitoring(siteBeanList);
}
}
如你所见,通过以上代码我们做到了近实时监听种子页面的变化,从而做到了让爬虫引擎永远在线,通过消息队列的方式解耦合了scheduler和worker,从而让爬虫朝稳定性上又迈进了一步。所以这也是本文作为一篇杂文的一个推论。也所以作为一篇杂文如果连推论都写了出来,可想而知这是什么屌杂文。周末了不如回家吃饭好了^^。