咱们今天就用Scala来写个高效好用的网络爬虫!Scala这语言处理并发任务特别拿手,尤其搭配Akka工具库,就像给爬虫装上了多线程引擎,能同时处理大量网页抓取。下面我会带你一步步实现:从发起网页请求、解析内容到管理抓取节奏,完整走一遍流程。你会发现用Scala写爬虫不仅性能强劲,代码结构还特别清晰!
下面是一个完整的 Scala 爬虫教程,重点展示如何利用 Scala 的并发特性(特别是 Akka Actor 模型)构建高性能的网络爬虫。
项目概述
我们将创建一个能够并发爬取多个网页的爬虫系统,包含以下功能:
并发发送 HTTP 请求
解析 HTML 内容提取链接和数据
控制请求频率避免被封禁
简单的数据存储
环境设置
1. 创建 SBT 项目
首先创建一个新的 SBT 项目,在 build.sbt 中添加以下依赖:
name:="scala-web-crawler"
version:="1.0"
scalaVersion:="2.13.8"
valakkaVersion="2.6.19"
valakkaHttpVersion="10.2.9"
libraryDependencies++=Seq(
// Akka Actor
"com.typesafe.akka"%%"akka-actor-typed"%akkaVersion,
"com.typesafe.akka"%%"akka-stream"%akkaVersion,
// Akka HTTP
"com.typesafe.akka"%%"akka-http"%akkaHttpVersion,
"com.typesafe.akka"%%"akka-http-spray-json"%akkaHttpVersion,
// HTML 解析
"org.jsoup"%"jsoup"%"1.14.3",
// 日志
"ch.qos.logback"%"logback-classic"%"1.2.11",
// 数据库存储 (SQLite)
"org.xerial"%"sqlite-jdbc"%"3.36.0.3",
"com.typesafe.slick"%%"slick"%"3.3.3",
"com.typesafe.slick"%%"slick-hikaricp"%"3.3.3"
)
实现代码
1. 定义消息协议(Actor 通信)
// src/main/scala/crawler/Messages.scala
packagecrawler
sealedtraitCrawlerMessage
caseclassStartCrawling(urls:List[String])extendsCrawlerMessage
caseclassCrawlUrl(url:String,depth:Int)extendsCrawlerMessage
caseclassPageFetched(url:String,content:String,depth:Int)extendsCrawlerMessage
caseclassParsePage(url:String,content:String,depth:Int)extendsCrawlerMessage
caseclassLinksFound(url:String,links:List[String],depth:Int)extendsCrawlerMessage
caseobjectCrawlingCompletedextendsCrawlerMessage
2. 实现网页下载器
// src/main/scala/crawler/Downloader.scala
packagecrawler
importakka.actor.typed.{ActorRef,Behavior}
importakka.actor.typed.scaladsl.Behaviors
importakka.http.scaladsl.Http
importakka.http.scaladsl.model._
importakka.http.scaladsl.unmarshalling.Unmarshal
importscala.concurrent.ExecutionContext
importscala.util.{Failure,Success}
objectDownloader{
sealedtraitCommand
caseclassDownload(url:String,depth:Int,replyTo:ActorRef[PageFetched])extendsCommand
defapply():Behavior[Command]=Behaviors.setup{context=>
implicitvalec:ExecutionContext=context.executionContext
Behaviors.receiveMessage{
caseDownload(url,depth,replyTo)=>
context.log.info(s"Downloading: $url (depth: $depth)")
// 使用 Akka HTTP 发送请求
Http(context.system.classicSystem).singleRequest(HttpRequest(uri=url)).onComplete{
caseSuccess(response)=>
response.statusmatch{
caseStatusCodes.OK=>
Unmarshal(response.entity).to[String].onComplete{
caseSuccess(content)=>
replyTo!PageFetched(url,content,depth)
caseFailure(ex)=>
context.log.error(s"Failed to parse content from $url: ${ex.getMessage}")
}
case_=>
context.log.warn(s"Request to $url failed with status: ${response.status}")
}
caseFailure(ex)=>
context.log.error(s"Request to $url failed: ${ex.getMessage}")
}
Behaviors.same
}
}
}
3. 实现页面解析器
// src/main/scala/crawler/Parser.scala
packagecrawler
importakka.actor.typed.{ActorRef,Behavior}
importakka.actor.typed.scaladsl.Behaviors
importorg.jsoup.Jsoup
importorg.jsoup.nodes.Document
importscala.collection.JavaConverters._
importscala.util.Try
objectParser{
sealedtraitCommand
caseclassParse(html:String,url:String,depth:Int,replyTo:ActorRef[LinksFound])extendsCommand
defapply():Behavior[Command]=Behaviors.receive{ (context,message)=>
messagematch{
caseParse(html,url,depth,replyTo)=>
context.log.info(s"Parsing page: $url")
Try{
valdoc:Document=Jsoup.parse(html,url)
// 提取页面标题
valtitle=doc.title()
// 提取所有链接
vallinks=doc.select("a[href]")
.asScala
.map(_.attr("abs:href"))
.filter(link=>link.startsWith("http")&&!link.contains("#"))
.toList
// 提取正文文本(简单示例)
valtext=doc.body().text()
(title,links,text)
}match{
casescala.util.Success((title,links,text))=>
// 这里可以添加代码将提取的数据存储到数据库
context.log.info(s"Found ${links.size} links on $url")
replyTo!LinksFound(url,links,depth)
casescala.util.Failure(exception)=>
context.log.error(s"Failed to parse $url: ${exception.getMessage}")
}
Behaviors.same
}
}
}
4. 实现爬虫管理器(核心 Actor)
// src/main/scala/crawler/CrawlerManager.scala
packagecrawler
importakka.actor.typed.{ActorRef,Behavior,SupervisorStrategy}
importakka.actor.typed.scaladsl.{Behaviors,PoolRouter,Routers}
importscala.collection.mutable
importscala.concurrent.duration._
objectCrawlerManager{
sealedtraitCommand
caseclassStart(urls:List[String],maxDepth:Int)extendsCommand
caseclassAddUrl(url:String,depth:Int)extendsCommand
caseclassCrawlCompleted(url:String)extendsCommand
defapply():Behavior[Command]=Behaviors.setup{context=>
// 创建下载器和解析器的池
valdownloaderPool:ActorRef[Downloader.Command]={
valpool=PoolRouter(Downloader()).withRoundRobinRouting().withPoolSize(5)
context.spawn(pool,"downloader-pool")
}
valparserPool:ActorRef[Parser.Command]={
valpool=PoolRouter(Parser()).withRoundRobinRouting().withPoolSize(3)
context.spawn(pool,"parser-pool")
}
// 状态管理
valvisitedUrls=mutable.Set.empty[String]
valpendingUrls=mutable.Queue.empty[(String,Int)]
varactiveTasks=0
varmaxDepth=3
Behaviors.receiveMessage{
caseStart(urls,depth)=>
maxDepth=depth
urls.foreach(url=>self!AddUrl(url,0))
Behaviors.same
caseAddUrl(url,depth)=>
if(!visitedUrls.contains(url)&&depth<=maxDepth) {
visitedUrls.add(url)
pendingUrls.enqueue((url,depth))
context.self!processNextUrl
}
Behaviors.same
caseprocessNextUrl:CrawlCompleted.type=>
activeTasks-=1
if(pendingUrls.nonEmpty) {
val(url,depth)=pendingUrls.dequeue()
activeTasks+=1
// 发送下载请求
downloaderPool!Downloader.Download(url,depth,context.self)
}elseif(activeTasks==0) {
context.log.info("Crawling completed!")
// 所有任务完成
}
Behaviors.same
casePageFetched(url,content,depth)=>
// 将页面发送给解析器
parserPool!Parser.Parse(content,url,depth,context.self)
Behaviors.same
caseLinksFound(url,links,depth)=>
context.log.info(s"Found ${links.size} links on $url")
// 将新链接添加到队列中
links.foreach{link=>
context.self!AddUrl(link,depth+1)
}
// 标记当前URL完成
context.self!CrawlCompleted
Behaviors.same
case_=>Behaviors.unhandled
}
}
// 内部消息对象
privatecaseobjectprocessNextUrlextendsCommand
}
5. 实现速率限制中间件
// src/main/scala/crawler/RateLimiter.scala
packagecrawler
importakka.actor.typed.{ActorRef,Behavior}
importakka.actor.typed.scaladsl.Behaviors
importscala.concurrent.duration._
objectRateLimiter{
sealedtraitCommand
caseclassRequest(url:String,replyTo:ActorRef[PageFetched])extendsCommand
caseobjectTickextendsCommand
defapply(delay:FiniteDuration):Behavior[Command]=Behaviors.withTimers{timers=>
timers.startTimerWithFixedDelay(Tick,delay)
Behaviors.setup{context=>
valqueue=scala.collection.mutable.Queue.empty[(String,ActorRef[PageFetched])]
Behaviors.receiveMessage{
caseRequest(url,replyTo)=>
queue.enqueue((url,replyTo))
Behaviors.same
caseTick=>
if(queue.nonEmpty) {
val(url,replyTo)=queue.dequeue()
context.log.debug(s"Processing: $url")
// 这里实际应该发送请求,但为了简化,我们直接返回模拟数据
replyTo!PageFetched(url,s"Content of $url",0)
}
Behaviors.same
}
}
}
}
6. 主应用程序
// src/main/scala/crawler/Main.scala
packagecrawler
importakka.actor.typed.ActorSystem
importakka.actor.typed.scaladsl.Behaviors
objectMainextendsApp{
// 创建Actor系统
valrootBehavior=Behaviors.setup[Nothing] {context=>
// 创建爬虫管理器
valcrawlerManager=context.spawn(CrawlerManager(),"crawler-manager")
// 启动爬虫
valstartUrls=List(
"https://httpbin.org/html",
"https://httpbin.org/links/10/0",
"https://httpbin.org/links/10/1"
)
crawlerManager!CrawlerManager.Start(startUrls,maxDepth=2)
Behaviors.empty
}
valsystem=ActorSystem[Nothing](rootBehavior,"WebCrawlerSystem")
// 10分钟后关闭系统
importsystem.executionContext
system.scheduler.scheduleOnce(10.minutes) {
system.terminate()
}
}
7. 配置 application.conf
akka {
loglevel = INFO
http {
host-connection-pool {
max-connections = 20
max-open-requests = 256
}
}
}
# 数据库配置(如果需要存储数据)
db {
config = "crawler.db"
driver = "org.sqlite.JDBC"
url = "jdbc:sqlite:crawler.db"
connectionPool = disabled
keepAliveConnection = true
}
运行爬虫
1、编译项目:
sbt compile
2、运行爬虫:
sbt run
扩展功能
这个基础爬虫可以进一步扩展:
1、数据存储:添加数据库支持,存储爬取的内容
2、代理支持:添加代理轮询功能避免IP被封
3、分布式爬取:使用Akka Cluster实现分布式爬虫
4、JS渲染:集成Selenium或HtmlUnit处理JavaScript渲染的页面
5、任务持久化:添加检查点机制,支持中断后恢复爬取
总结
这个教程展示了如何利用Scala和Akka构建一个高性能的并发网络爬虫。通过Actor模型,我们可以轻松实现:
1、高并发处理:使用Actor池并行处理多个请求
2、容错能力:Actor之间的隔离确保单个页面解析失败不会影响整体系统
3、流量控制:通过队列和速率限制避免过度请求
4、可扩展性:可以轻松添加新功能组件
Scala的函数式特性和强大的类型系统,结合Akka的Actor模型,使得构建健壮、高性能的爬虫系统变得更加容易。
看,用Scala写的爬虫不仅功能完整,还自带并发加速buff!Actor模型让各个抓取任务互不干扰,就算某个网页解析失败也不会拖垮整个系统。这种架构稍加改造就能应对更复杂的场景,比如分布式爬取或应对反爬机制。希望这个示例能让你体会到Scala在处理高并发任务时的独特魅力,下次需要抓取大规模数据时,不妨考虑让它大显身手!