ActorWordCount.scala
package day05
import scala.actors.{Actor, Future}
import scala.collection.mutable.ListBuffer
import scala.io.Source
/**
* 用Actor并发编程实现WordCount
*/
object ActorWordCount {
def main(args: Array[String]): Unit = {
// 存放接收到的每个文件的结果数据
val replys: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
// 存放有值得Future里的数据
val res: ListBuffer[Map[String, Int]] = new ListBuffer[Map[String,Int]]
val files: Array[String] = Array("d:/scala/a.txt","d:/scala/b.txt","d:/scala/c.txt")
for (file <- files) {
// val liens: List[String] = Source.fromFile(file).getLines().toList
// val words: List[String] = lines.flatMap(_.split(" "))
// val res: Map[String, Int] = words.map((_,1)).groupBy(_._1).mapValues(_.size)
val task = new Task
task.start()
// 接收结果数据,异步发送消息,有返回值
val reply: Future[Any] = task !! SmTask(file)
// 把每个文件的数据存储到ListBuffer
replys += reply
}
while (replys.size > 0) {
// 过滤每个Future对象,如果None类型的,就过滤掉
val dones: ListBuffer[Future[Any]] = replys.filter(_.isSet)
for(done <- dones) {
res += done.apply().asInstanceOf[Map[String,Int]]
replys -= done
}
}
// 得到全局聚合的数据
println(res.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)))
}
}
class Task extends Actor {
override def act(): Unit = {
receive({
case SmTask(file) => {
val liens: List[String] = Source.fromFile(file).getLines().toList
val words: List[String] = liens.flatMap(_.split(" "))
val res: Map[String, Int] = words.map((_,1)).groupBy(_._1).mapValues(_.size)
// 异步发送结果数据,没有返回值
sender ! res
}
})
}
}
case class SmTask(file: String)