本文转自 解读Scala中的async/await
最近在学习 Coursera 上的 Principles of reactive programming,在第三周的课程里,我们需要理解 Scala 中的 Future 和 Promise,因为它们是 reactive programming 的基础。
里面提到了一对很有趣的方法,叫 async 和 await,实现了类似 C# 中的 async/await 关键字(在 Javascript 的 ES7 中也将会实现),可以让我们以类似写同步代码的方式写出实际上是异步执行的代码。
这种方式可以大幅提高可读性,减少不必要的代码嵌套和 callback,感觉很酷,但是如果不知道它背后是什么原理的话,可能不敢放心使用。
在这里将会一步步介绍它产生的原因,以及基本的实现原理。
Future
在 Scala 中,我们可以使用 Future.apply
,将一段耗时的代码放到一个线程池中异步执行,不会阻塞当前线程。
Future.apply
除了接收我们传入的耗时代码,还需要一个 implicit 的ExecutionContext
(相当于一个线程池)用来执行异步代码。这里我们使用了 Scala 提供的一个 global context: scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global
println("### start in main thread: " + Thread.currentThread().getName)
Future {
println("### enter future in pool: " + Thread.currentThread().getName)
Thread.sleep(2000)
println("### exit future in pool: " + Thread.currentThread().getName)
}
println("### end in main thread: " + Thread.currentThread().getName)
执行这段代码,会得到以下的输出:
### start in main thread: main
### end in main thread: main
### enter future in pool: ForkJoinPool-1-worker-5
可以看到 ### end
在 ### enter future
前打印出来(不是绝对,也有可能在后面,但也能说明问题),说明 Future
中的那段耗时代码并不会阻塞当前线程。
通过这样的方式,我们就可以方便地在多个线程中同时执行操作,互不干扰,也不会阻塞当前线程,提高了执行效率。
但显然这样会带来一个问题:我怎么取 Future 中在未来某个时间点计算出来的值?
通过回调操作 Future 产生的值
对于异步代码,通常我们会传入一个回调,让异步代码在未来某个时间计算出值后调用它。
Future
也提供了类似的方法,比如 onComplete
, onSuccess
, onFailure
等。
val pageFuture = Future { getPageFromUrl(...) }
pageFuture.onSuccess { page =>
println("Got page: " + page)
}
对于简单的情况,这样是很方便的。但是如果我们需要对产生的结果再进行一些操作,又得到一些 Future,那么很容易产生大量的回调嵌套,导致代码的可读性变差:
val pageFuture = Future { getPageFromUrl(...) }
pageFuture.onSuccess { page =>
val tree = parsePage(page)
tree.onSuccess { tree =>
val images = findImages(tree)
images.onSuccess { images =>
println("Found images: " + images)
}
}
}
所以 Future
又提供了很多用来组合的方法,让我们不用(或尽量少用)嵌套,比如 map
, flatMap
, foreach
等方法。比如上面的代码使用 flatMap
,可以改写为:
val pageFuture = Future { getPageFromUrl(...) }
pageFuture.flatMap ( page => parsePage(page) )
.flatMap ( tree => findImages(tree) )
.foreach { images =>
println("Found images: " + images)
}
可以看到,之前的嵌套回调变成了顺序回调,功能未变,但代码的可读性得到了极大提高。
当然还可以使用 for
表达式,再次提高可读性:
val pageFuture = Future { getPageFromUrl(...) }
for {
page <- pageFuture
tree <- parsePage(page)
images <- findImages(tree)
} println("Found images: " + images)
Await.ready/result
通过上面的方式,虽然我们可以方便的操作 Future 产生的值,但可能会出现这种情况:一旦使用了 Future,并需要对它在产生的值进行操作,就会发现几乎所有后续代码都会写成异步的形式。
如果想在当前线程中拿到异步的值,该怎么办?
比如测试中,我们需要在主线程中拿到某个 Future 的值进行判断,虽然可以让测试框架模仿 js 中的 jasmine/mocha 那样通过调用一个特别的 done 回调来通知完成,但是如果能直接在主线程中拿到值,代码会更简洁一些。
为了解决这个问题,Scala 提供了一个 scala.concurrent.Await
类,可以让我们调用其 ready/result
等方法,阻塞当前线程,直到传入的 future 完成。ready
与 result
很像,只是前者不关心返回值只等待,而后者还要返回 futures 中产生的值。
其调用方法如下:
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future { doSomething(); 42; }
val value = Await.result(future, 2 seconds)
println(value) // 42
其中前两句导入,是为了支持下面 2 seconds
这种方便的写法。
Await.result
那一句,是说我将在当前线程等待 future
的执行,最多只执行 2 秒。如果 2 秒内执行结束,便马上返回它产生的值 42
;否则会抛出 java.util.concurrent.TimeoutException
。
在等待的过程中,当前线程是阻塞的,后面的代码只能等到 future
成功执行完,才有机会执行。
这种方式可以满足我们的需求,但是不推荐在生产代码中使用。因为一旦使用,就会阻塞当前线程,会让我们在前面辛苦实现的 Future 的工作白做了。
同时操作多个Future时的代码可读性
虽然 Future 提供了各种 map
, flatMap
,让我们在对同一个 Future 的值进行多次后续操作时变得简单,但是当我们需要同时操作多个 Future 时,代码依然比较难看:
val f1 = Future {false}
val f2 = Future {2}
val f3 = Future {3}
f1.foreach { r1 =>
if (r1) {
f2.foreach { r2 =>
println("### " + r2)
}
} else {
f3.foreach { r3 =>
println("### " + r3)
}
}
}
如果我们不考虑前面的 Await.result
带来的负面影响,使用它可以极大的简化代码:
val f1 = Future {false}
val f2 = Future {2}
val f3 = Future {3}
def await[T](future: Future[T]): T = Await.result(future, 1000 seconds)
if (await(f1)) {
println("### " + await(f2))
} else {
println("### " + await(f3))
}
我们能否找到一种鱼与熊掌兼得的方式?
async/await
自从 Scala 提供了宏的功能以后,我们就可以实现各种奇葩的功能,包括这里要介绍的 async/await
: https://github.com/scala/async
这个库实现了 C# 中引入的 async/await
关键字(类似的还有 Javascript 将在 ES7 中提供的 async/await
),让我们可以写出看起来非常简洁的同步形式的代码,但实际执行却是异步的。
我们只需要在项目中添加这个依赖:
"org.scala-lang.modules" %% "scala-async" % "0.9.2"
就可以将上面的代码写成这样:
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
async {
val f1 = async {false}
val f2 = async {2}
val f3 = async {3}
if (await(f1)) {
println("### " + await(f2))
} else {
println("### " + await(f3))
}
}
看起来跟前面的 Await.result
的版本非常像,只是用 async
替换了 Future
,然后使用了它提供了 await
函数。
但是神奇之处在于,这段代码中的 await
是不会阻塞当前线程的!这段代码在编译期,会被转换成异步形式,依然是非阻塞的。(实际代码是用状态机,见后面解释,但可以按前面 future.foreach
那种多重嵌套的代码来理解)
使用这种方式,会有一些条件限制,详情可见其主页。
如果对一个 future 进行连续操作,或者用到了多个 future 但它们之间没什么关系时,尽量使用 flatMap/map
或者 for
表达式。但是对于同时操作多个有关系的 future 时,如果使用 async/await
可以大幅提高代码可读性的时候,可以考虑使用它们。
(另外根据经验,使用宏实现的功能时,很容易出现诡异的问题,难以定位,难以调试)
基本实现原理
在这份文档中,详细描述了它的实现原理:http://docs.scala-lang.org/sips/pending/async.html
文档在最后展示了下面这段代码:
val future = async {
val f1 = async { true }
val x = 1
def inc(t: Int) = t + x
val t = 0
val f2 = async { 42 }
if (await(f1)) await(f2) else { val z = 1; inc(t + z) }
}
在编译器转换为 Scala 代码后会是什么样子。因为里面的命名和结构有点乱,难以阅读,所以我对它进行了一些重构,如下:
object AsyncAwaitImpl {
abstract class StateMachine[T, K]
class MyStateMachine extends StateMachine[Promise[Int], ExecutionContext] {
val f1 = Future {true}
val x = 1
def inc(t: Int) = t + x
val t = 0
var f2 = Future {42}
var state = 0
val resultPromise = Promise[Int]()
var result: Int = 0
var f1Result: Boolean = false
var f2Result: Int = 0
def resumeAsync(): Unit = try {
state match {
case 0 => {
f1.onComplete(this.apply)
}
case 1 => {
result = 0
if (f1Result) {
state = 2
resumeAsync()
} else {
state = 3
resumeAsync()
}
}
case 2 => {
f2.onComplete(this.apply)
}
case 5 => {
result = f2Result
state = 4
resumeAsync()
}
case 3 => {
result = {
val z = 1
inc(t + z)
}
state = 4
resumeAsync()
}
case 4 => resultPromise.complete(Success(result))
}
} catch {
case NonFatal(tr) => resultPromise.complete(Failure(tr))
}
def apply(result: Try[Any]): Unit = state match {
case 0 => {
if (result.isFailure) {
resultPromise.complete(result.asInstanceOf[Try[Int]])
} else {
f1Result = result.get.asInstanceOf[Boolean]
state = 1
resumeAsync()
}
}
case 2 => {
if (result.isFailure) {
resultPromise.complete(result.asInstanceOf[Try[Int]])
} else {
f2Result = result.get.asInstanceOf[Int]
state = 5
resumeAsync()
}
}
}
def apply(): Unit = resumeAsync()
}
val myStateMachine = new MyStateMachine()
Future(myStateMachine())
myStateMachine.resultPromise.future
}
这段代码的可读性已经比较好了,只需要细细阅读一遍就能清楚,这里只说一些重点:
- 实现了一个状态机,对于不同的分支将进入不同的状态,这些状态在从开始到结束期间,将可能跨多个 future
- 对于
await(future)
,将会收集出哪些代码要使用future
的值,然后把它们加到future.onComplete
中,实现异步
在看 async/await
相关的文档时,虽然它们有各种各样详细的说明和介绍,但都没有明确的告诉读者,使用 async/await
后的代码,会不会阻塞当前线程。可能在一些地方隐约的透露了,但对于一个初学者来说,没有明确的说明,总让人不放心,不敢用。
好在后来我在 这个回答 中看到有人明确提到了,并且我自己也做了一些试验,这才放下心来。
关于 Future 中的 Await.result
以及 async/await
是否会阻塞当前代码的例子,我将会在另一篇文章中详细说明。