今天下午一个小同事问我为啥用scala的Future.sequence之后,future都是并发执行的,而非想像中的是按照串行执行的,导致并发生成了几千个到DB的连接,服务器的线程池也被这些并发的任务塞满,而被reject。我花了好多时间给他解释这个问题,不知道现在搞明白了没有。现在把这个方法共享下(这是我在stackoverflow上回答这个问题的链接,自认为比其他的回答都要好点(毕竟跟原来的是基本一样的,用法也非常相似),大家看到的帮忙vote一下,eagle yuan 的那个回答,不要看错了哟~~:http://stackoverflow.com/questions/25056957/why-future-sequence-executes-my-futures-in-parallel-rather-than-in-series/42416951#42416951
)
下面先贴代码:
trait FutureExt[T] extends Future[T] {
def seq [A, M[X] <: TraversableOnce[X]](in: M[() => Future[A]])(implicit cbf: CanBuildFrom[M[()=>Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(Future.successful(cbf(in))) {
(fr, ffa) => for (r <- fr; a <- ffa()) yield (r += a)
} map (_.result())
}
}
用的话是这个样子的:
val workSeq = Seq(1,2,3).map{x => {()=>Future(some code that put x into db)}}
FutureExt.seq(workSeq).map{ workResultSeq =>
//do the thing
}
最重要的事情是Future执行的时机是它被创建的时候,这个跟是不是flatMap没有什么关系,Future.flatMap 串行的是后面用来“获得”Future的代码,而这个被“获得”的Future可能早就被创建好了。
但是毕竟这个方法就纯粹的串行了,后面有时间的话再加一个可以根据当前线程池大小自动串并结合的。
将比如说10000个任务,分成每次100个并发 然后100 次这样串行并且最后收集10000个结果的方法:
···
//占位符,在这写上面那个任务的代码
···