【Akka】在并发程序中使用Future

引言

在Akka中, 一个Future是用来获取某个并发操作的结果的数据结构。这个操作通常是由Actor执行或由Dispatcher直接执行的. 这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。
Future提供了一种简单的方式来执行并行算法。

Future直接使用

Future中的一个常见用例是在不需要使用Actor的情况下并发地执行计算。
Future有两种使用方式:

  1. 阻塞方式(Blocking):该方式下,父actor或主程序停止执行知道所有future完成各自任务。通过scala.concurrent.Await使用。
  1. 非阻塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在执行期间启动future,future任务和父actor并行执行,当每个future完成任务,将通知父actor。通过onCompleteonSuccessonFailure方式使用。

执行上下文(ExecutionContext)

为了运行回调和操作,Futures需要有一个ExecutionContext
如果你在作用域内有一个ActorSystem,它会它自己派发器用作ExecutionContext,你也可以用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹,或者甚至创建自己的实例。
通过导入ExecutionContext.Implicits.global来导入默认的全局执行上下文。你可以把该执行上下文看做是一个线程池,ExecutionContext是在某个线程池执行任务的抽象。
如果在代码中没有导入该执行上下文,代码将无法编译。

阻塞方式

第一个例子展示如何创建一个future,然后通过阻塞方式等待其计算结果。虽然阻塞方式不是一个很好的用法,但是可以说明问题。
这个例子中,通过在未来某个时间计算1+1,当计算结果后再返回。

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FutureBlockDemo extends App{
  implicit val baseTime = System.currentTimeMillis

  // create a Future
  val f = Future {
    Thread.sleep(500)
    1+1
  }
  // this is blocking(blocking is bad)
  val result = Await.result(f, 1 second)
  // 如果Future没有在Await规定的时间里返回,
  // 将抛出java.util.concurrent.TimeoutException
  println(result)
  Thread.sleep(1000)
}

代码解释:

  1. 在上面的代码中,被传递给Future的代码块会被缺省的Dispatcher所执行,代码块的返回结果会被用来完成Future。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。
  1. Await.result方法将阻塞1秒时间来等待Future结果返回,如果Future在规定时间内没有返回,将抛出java.util.concurrent.TimeoutException异常。
  2. 通过导入scala.concurrent.duration._,可以用一种方便的方式来声明时间间隔,如100 nanos500 millis5 seconds1 minute1 hour3 days。还可以通过Duration(100, MILLISECONDS)Duration(200, "millis")来创建时间间隔。

非阻塞方式(回调方式)

有时你只需要监听Future的完成事件,对其进行响应,不是创建新的Future,而仅仅是产生副作用。
通过onComplete,onSuccess,onFailure三个回调函数来异步执行Future任务,而后两者仅仅是第一项的特例。

使用onComplete的代码示例:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

object FutureNotBlock extends App{
  println("starting calculation ...")
  val f = Future {
    Thread.sleep(Random.nextInt(500))
    42
  }

  println("before onComplete")
  f.onComplete{
    case Success(value) => println(s"Got the callback, meaning = $value")
    case Failure(e) => e.printStackTrace
  }

  // do the rest of your work
  println("A ...")
  Thread.sleep(100)
  println("B ....")
  Thread.sleep(100)
  println("C ....")
  Thread.sleep(100)
  println("D ....")
  Thread.sleep(100)
  println("E ....")
  Thread.sleep(100)

  Thread.sleep(2000)
}

使用onSuccess、onFailure的代码示例:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.Random

object Test12_FutureOnSuccessAndFailure extends App{
  val f = Future {
    Thread.sleep(Random.nextInt(500))
    if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42
  }

  f onSuccess {
    case result => println(s"Success: $result")
  }

  f onFailure {
    case t => println(s"Exception: ${t.getMessage}")
  }

  // do the rest of your work
  println("A ...")
  Thread.sleep(100)
  println("B ....")
  Thread.sleep(100)
  println("C ....")
  Thread.sleep(100)
  println("D ....")
  Thread.sleep(100)
  println("E ....")
  Thread.sleep(100)

  Thread.sleep(1000)
}

代码解释:
上面两段例子中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。
然后在回调函数中进行相关处理。

创建返回Future[T]的方法

先看一下示例:

import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object ReturnFuture extends App{
  implicit val baseTime = System.currentTimeMillis

  // `future` method is another way to create a future
  // It starts the computation asynchronously and retures a Future[Int] that
  // will hold the result of the computation.
  def longRunningComputation(i: Int): Future[Int] = future {
    Thread.sleep(100)
    i + 1
  }

  // this does not block
  longRunningComputation(11).onComplete {
    case Success(result) => println(s"result = $result")
    case Failure(e) => e.printStackTrace
  }

  // keep the jvm from shutting down
  Thread.sleep(1000)
}

代码解释:
上面代码中的longRunningComputation返回一个Future[Int],然后进行相关的异步操作。
其中future方法是创建一个future的另一种方法。它将启动一个异步计算并且返回包含计算结果的Future[T]

Future用于Actor

通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息actor ! msg,这种方法只在发送者是一个Actor时有效;第二种是通过一个Future。
使用Actor的?方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:

import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg
val result = Await.result(future, timeout.duration).asInstanceOf[String]

下面是使用?发送消息给actor,并等待回应的代码示例:

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.concurrent.duration._

case object AskNameMessage

class TestActor extends Actor {
  def receive = {
    case AskNameMessage => // respond to the 'ask' request
      sender ! "Fred"
    case _ => println("that was unexpected")
  }
}
object AskDemo extends App{
  //create the system and actor
  val system = ActorSystem("AskDemoSystem")
  val myActor = system.actorOf(Props[TestActor], name="myActor")

  // (1) this is one way to "ask" another actor for information
  implicit val timeout = Timeout(5 seconds)
  val future = myActor ? AskNameMessage
  val result = Await.result(future, timeout.duration).asInstanceOf[String]
  println(result)

  // (2) a slightly different way to ask another actor for information
  val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String]
  val result2 = Await.result(future2, 1 second)
  println(result2)

  system.shutdown
}

代码解释:

  1. Await.result(future, timeout.duration).asInstanceOf[String]会导致当前线程被阻塞,并等待actor通过它的应答来完成Future。但是阻塞会导致性能问题,所以是不推荐的。致阻塞的操作位于Await.resultAwait.ready中,这样就方便定位阻塞的位置。
  1. 还要注意actor返回的Future的类型是Future[Any],这是因为actor是动态的。 这也是为什么上例中注释(1)使用了asInstanceOf
  2. 在使用非阻塞方式时,最好使用mapTo方法来将Future转换到期望的类型。如果转换成功,mapTo方法会返回一个包含结果的新的 Future,如果不成功,则返回ClassCastException异常。

转载请注明作者Jason Ding及其出处
Github博客主页(http://jasonding1354.github.io/)
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容