运用Aggregator模式实现MapReduce

MapReduce是更好地利用并行计算资源来提升数据处理能力的重要算法,如今已被主流的大数据分析平台实现,成为了大数据批量处理的主力军。利用前面介绍的Actor特性,其实我们也可以实现一个简易的MapReduce。

利用AKKA Actor来实现MapReduce,天生就支持并行计算(利用远程Actor)与异步操作。为了简便起见,本例使用了本地的Actor实现了大数据世界的Hello World,即WordCounter。

在编写字数统计器的MapReduce之前,我们需要先分辨职责,包括:

  • 给定网页地址,获取指定网页的内容
  • 对网页内容进行分词
  • 为每个单词统计字数

考虑到本文的中心主题是介绍响应式编程与Actor模型,所以我们降低了案例难度,读取的网页内容均为英文,并简单地以空格作为分词的标志。由于我们需要接受客户端的字数统计分析请求,那么要完成前面提到的职责,至少需要四个Actor:

  • WordCounterClient:发送数据分析请求
  • WordCounterServer:模拟服务端,接收数据分析请求,并最终将统计后的结果返回给WordCounterClient
  • PageContentFetcher:获取网页内容
  • ContentWordCounter:网页内容的字数统计器

为了尽可能地提升性能,对于获取网页内容以及统计内容字数的统计工作,我们都需要多个Actor同时执行。然而,由于每个Actor处理消息都是以异步形式进行,我们该怎样才能知道并发处理的请求都得到了处理?针对字数统计器的案例而言,我们还需要将每个Actor统计获得的字数再进行reduce,同样也需要知道是否每条消息都已经处理完毕,并获得处理的结果。

AKKA通过Aggregator特性实现了Aggregator模式,可以很好地解决刚才提到的问题。它通过引入一个单独的聚合器Actor,用以聚合多个Actor产生的数据,并根据这些Actor对消息的Response更新状态

假定ContentWordCounter分析后的结果如下代码所示:

case class AnalysisResult(wordToCount: Seq[(String, Long)])

那么,Aggregator就可以通过在其内部维持一个分析结果集(即前面所谓的状态,代码中的analysisResults),每收到一个Actor的Response,就将结果塞入到这个结果集(更新状态)中,并判断结果集的长度是否等于要处理的网页数,以此作为消息是否处理完毕的条件。整个Aggregator的实现如下:

class WordCounterAggregator extends Actor with Aggregator {  expectOnce {
    case StartAggregation(target, urls) =>
      new Handler(target, urls, sender)
    case _ =>
      sender ! BadCommand
      context stop self
  }
  class Handler(target: ActorRef, urls: Seq[String], originalSender: ActorRef) {
    var analysisResults = Set.empty[AnalysisResult]
    context.system.scheduler.scheduleOnce(10.seconds, self, Timeout)
    expect {
      case Timeout =>
        respondIfDone(respondAnyway = true)
    }
    urls.foreach { uri =>
      target ! FetchPageContent(uri)
      expectOnce {
        case result: AnalysisResult =>
          analysisResults += result
          respondIfDone()
      }
    }
    def respondIfDone(respondAnyway: Boolean = false) = {
      import MapSeqImplicits._
      if (respondAnyway || analysisResults.size == urls.size) {
        val wordToCounts = analysisResults.flatMap(_.wordToCount).reduceByKey(_ + _)
        originalSender ! AggregatedAnalysisResult(wordToCounts)
        context stop self
      }
    }
  }
}

WordCounterAggregator继承了Aggregator特性,这个特性已经对Actor的receive进行了处理,使得继承该特性的Actor不需要重写receive方法。Aggregator特性提供了expectexpectOnceunexpect,用以接收期待处理的消息。

在Aggregator内部,其实维持了一个expectList,用以存放expect等函数所接收的偏函数。expectexpectOnce都是将偏函数放入到这个列表中,只是后者只留存一次(通过permanent标志来判定),一旦匹配了,就会将该偏函数移除,而expect则不会;至于unexpect,就是expect的反操作,用于将偏函数从列表中移除。

自定义的respondIfDone方法会在满足聚合条件时,对分析结果进行reduce运算。Scala的集合库自身并没有提供reduceByKey()函数,是我模仿Spark的RDD自行编写的隐式转换方法:

object MapSeqImplicits {
  implicit class MapSeqWrapper(wordToCount: Iterable[(String, Long)]) {
    def reduceByKey(f: (Long, Long) => Long): Seq[(String, Long)] = {
      wordToCount.groupBy(_._1).map {
        case (word, counts) => (word, counts.map(_._2).foldLeft(0L)(f))      
      }.toSeq
    }
 }
}

因为引入了一个Aggregator,消息的处理以及Actor之间的协作就变得相对复杂。要进行响应式编程,其中一个关键就是要理清楚数据(或消息)的流动方向,并分辨每个数据处理器的职责。我们可以借助类似状态图之类的可视化工具帮助我们分析数据流动模型。下图是本例的一个消息处理模型,它同时还表达了Actor之间的协作关系。

Actor之间的协作

执行字数统计的流程如下所示:

  • 首先,WordCounterClient接收StartAnalysisWebPages消息,准备分析网页;
  • 由于Client没有这个“能力”完成分析任务,于是求助于WordCounterServer,并发起FetchWebPages消息,要求获取网页内容;
  • WordCounterServer同样是个惫懒货色,什么都不干,转手就将这件事情转交给别的Actor了,所以他其实就是一个前台接待员。如果不需要聚合,它收到的FetchWebPages其实应该交给PageContentFetcher,但现在须得经由WordCounterAggregator来分配请求;所以从另外一个角度来看,这个Aggregator相当于是一个Mediator;
  • 由于Aggregator是一个Mediator,因此它会协调多个PageContentFetcher与ContentWordCounter来并行完成任务;因而Aggregator和这两个Actor之间是一对多关系,而PageContentFetcher与ContentWordCounter则属于一对一关系。当PageContentFetcher获得了网页内容后,就通过CountPageContent消息,将统计字数的职责交给了ContentWordCounter;
  • ContentWordCounter在计算完当前网页的字数后,会将分析结果AnalysisResult返回给Aggregator,并由其完成分析结果的reduce运算,并返回AggregatedAnalysisResult结果给Server;
  • 最后,Server再将Client需要的最终结果返回给Client。

由于Aggregator需要协调多个Fetcher与Counter的Actor,以支持异步并行计算(本例实则是并发计算)的需要,我为其引入了AKKA提供的Router Actor。通过Router可以创建一个容器Actor,内部管理多个worker rootees,并提供了RoundRobin、Random、Boardcast等多种路由形式,用户可以根据Actor的负载情况选择不同的路由方式。

这里,我选择使用RoundRobin以硬编码的形式创建了Router Actor:

val analyst: ActorRef = context.actorOf(Props(new ContentWordCounter(aggregator)), "PageContentAnalyst") 
val fetchers = context.actorOf(RoundRobinPool(4).props(Props(new PageContentFetcher(analyst))), "fetchers")

整体来看,PageContentFetcher与ContentWordCounter其实扮演的是map角色,并通过Router Actor来实现map工作的异步并发处理;而WordCounterAggregator则扮演了reduce角色,它负责将收到的多个分析结果进行reduce运算。

由于缺乏对MapReduce算法必要的封装,用AKKA Actor实现的MapReduce显得比较复杂,但却较好地体现了响应式编程的异步数据流本质。

当我们在使用Actor来处理异步消息传递时,当业务渐趋复杂后,我们常常会迷失在复杂的消息传递网中而无法自拔。为了保持清醒的头脑,需要时刻谨记Actor的职责。以我的经验,我们应该考虑:

  • 从Actor扮演的角色来思考它应该接收什么样的消息;
  • Actor对消息的处理一定要满足单一职责原则,正确地履行职责,也当在合适时候正确地转移职责;
  • 运用状态图帮助思考Actor与其他Actor之间的协作关系;
  • 正确理解AKKA Actor的消息发送机制,当在Actor内部再次发送消息时,是由sender发送,还是通过消息传递过来的actorRef对象发送消息。

要完成多个网页的字数统计功能,除了使用稍显复杂的Actor模式之外,我们也可以直接使用scala提供的并行集合来完成,代码更为精简:

val words = for {
 url <- urls.par
 line <- scala.io.Source.fromURL(url).getLines()
 word <- line.split(" ")
} yield (word)
val analysisResult = words.map(w => (w, 1L)).reduceByKey(_ + _)

在业务相对简单,并不需要非阻塞消息处理,也没有可伸缩性需求的时候,若能恰当运用scala自身提供的par集合会是好的选择。

事实上,为了实现字数统计的功能,采用AKKA提供的Aggregator确乎有些过度。它更擅长于通过将职责分治与合理运用基于消息的Actor模式来完成更为复杂的响应式系统。WordCounter的例子不外乎是我为了更好地解释Aggregator模式而给出的一个Demo罢了。


本文以及《利用Actor实现管道过滤器模式》两篇文章给出的源代码,可以在我的github上获得:https://github.com/agiledon/reactiveprogramming.git

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容