纯函数式的并行计算(1)

选择数据类型和函数
“创建并行计算”具体是指什么?我们可以从一个相对简单的例子入手——求一组整数的和。例如下面就是利用左折叠的方法计算求和:

  def sum(ints: Seq[Int]): Int =
    ints.foldLeft(0)(_ + _)

除了叠加算法, 还有一个分治的算法,代码如下:

  def sum1(ints: IndexedSeq[Int]): Int =
    if (ints.length < 1)
      ints.headOption.getOrElse(0)
    else {
      val (l, r) = ints.splitAt(ints.length)
      sum1(l) + sum1(r)
    }

我们使用splitAt函数将序列一分为二,并各自递归求和,最后合并它们的结果。这种实现可以实现并行化,即对两部分的求和可以同时进行。
一种用于并行计算的数据类型
用于表示并行计算的任何数据类型都包含一个结果,这个结果是一个有意义的类型(这里是Int),且能够获取。为此我们设计一个这样的数据类型Par[A](Par是Paralle的简写),它就像一个装有结果的容器,并具备下面的方法:

trait Par[A] {

}

object Par {

  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: => A): Par[A] = ???

  //从并行计算中抽取结果
  def get[A](pa: Par[A]): A = ???
}

现在我们用自定义的数据类型更新求和算法:

  import Par._

  def sum2(ints: IndexedSeq[Int]): Int =
    if (ints.length <= 1)
      ints.headOption.getOrElse(0)
    else {
      val (l, r) = ints.splitAt(ints.length)
      val pl = unit(sum2(l))
      val pr = unit(sum2(r))
      get(pl) + get(pr)
    }

现在我们面临一个选择, 是让unit在一个独立的逻辑线程中立即求值,还是等到get被调用的时候再求值。unit立即求值会导致程序无法并行计算,但是unit返回一个代表一步计算的Par[Int],那在调用get的时候无法避免产生副作用。如何才能避免unit和get的缺陷呢?
组合并行计算
我们可以不调用get函数, 那么函数的代码将如下:

  def sum3(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
      unit(ints.headOption.getOrElse(0))
    else {
      val (l, r) = ints.splitAt(ints.length)
      map2(sum3(l), sum3(r))(_ + _)
    }

练习 7.1
Par.map2是一个新的高阶函数,用于组合两个并行计算的结果。实现map2函数:

  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
    val a = get(pa)
    val b = get(pb)
    unit(f(a, b))
  }

显性分流
目前的API没有明确的表明何时应该将计算从主线程中分流出去,换句话说程序员也不知道在哪儿会发生并行计算。如何让分流更加明确呢?我们引入另一个函数来做:

  //将par[A]分配另一个独立的线程中去运行
  def folk[A](pa: => Par[A]): Par[A] = ???

让我们来重写sum函数:

  def sum4(ints: IndexedSeq[Int]): Par[Int] =
    if (ints.length <= 1)
      unit(ints.headOption.getOrElse(0))
    else {
      val (l, r) = ints.splitAt(ints.length)
      map2(folk(sum3(l)), folk(sum3(r)))(_ + _)
    }

对于length <= 1情况我们并不需要folk到一个独立线程中计算。
现在回到unit是严格还是惰性的问题,有了folk,即便unit是严格也不会有什么损失。至于非严格版本我们叫它lazyUnit吧:

  //接受一个已求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: A): Par[A] = ???
  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))

到此,我们看出Par只是一个值的函数,表明并行计算。而实际执行并行计算的是get函数,所以我们将get函数改名为run函数,表明这个并行计算实际执行的地方。

  //从并行计算中抽取结果
  def run[A](pa: Par[A]): A = ???

确定表现形式
经过各种思考和选择之后,我们有了下面大致的API。

  //接受一个已求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: A): Par[A] = ???
  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
  //从并行计算中抽取结果
  def run[A](pa: Par[A]): A = ???
  //将par[A]分配另一个独立的线程中去运行
  def folk[A](pa: => Par[A]): Par[A] = ???
  
  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
    val a = run(pa)
    val b = run(pb)
    unit(f(a, b))
  }

练习 7.2
在继续之前,我们尽可能实现API中的函数
如上代码
让我们根据run函数反推Par类型,让我们试着假设run可以访问一个ExecutorService,看能不能搞清Par的样子:

  def run[A](s: ExecutorService)(pa: Par[A]): A = ???

最简单的莫过于,Par[A]是ExecutorService => A,当然这也未免太简单了,为此Par[A],应该是ExecutorService => Future[A],而run直接返回Future:

  type Par[A] = ExecutorService => Future[A]
  def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)

完善API
既然有了Par的表现形式,不妨就简单直接点,基于Par的表现类型最简单的实现:

object Par {

  import java.util.concurrent.{ExecutorService, Future, Callable}
  
  type Par[A] = ExecutorService => Future[A]

  private case class UnitFuture[A](a: A) extends Future[A] {
    override def isCancelled: Boolean = false

    override def get(): A = a

    override def get(timeout: Long, unit: TimeUnit): A = a

    override def cancel(mayInterruptIfRunning: Boolean): Boolean = false

    override def isDone: Boolean = true
  }
  
  //接受一个已求值的A,返回结果将会在另一个线程中执行
  def unit[A](a: A): Par[A] = es => UnitFuture(a)
  
  //接受一个未求值的A,返回结果将会在另一个线程中执行
  def lazyUnit[A](a: => A): Par[A] = folk(unit(a))
  
  //从并行计算中抽取结果
  def run[A](s: ExecutorService)(pa: Par[A]): Future[A] = pa(s)
  
  //将par[A]分配另一个独立的线程中去运行
  def folk[A](pa: => Par[A]): Par[A] = es => {
    es.submit(new Callable[A] {
      override def call(): A = pa(es).get()
    })
  }

  def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = 
    es => {
      val af = pa(es)
      val bf = pb(es)
      UnitFuture(f(af.get(), bf.get()))
    }
  
}

练习7.3
改进Map2的实现,支持超时设置。

  def map2[A, B, C](pa: Par[A], pb: Par[B], 
                    timeout: Long, timeUnit: TimeUnit)(f: (A, B) => C): Par[C] =
    es => {
      val af = pa(es)
      val bf = pb(es)
      val a = af.get(timeout, timeUnit)
      val b = bf.get(timeout, timeUnit)
      UnitFuture(f(a, b))
    }

练习 7.4
使用lazyUnit写一个函数将另一个函数A => B转换为一个一步计算

  def asyncF[A, B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

练习 7.5
实现一个叫做sequence的函数。不能使用而外的基础函数,不能调用run。

  def sequence[A](li: List[Par[A]]): Par[List[A]] = {
    def loop(n: Int, res: Par[List[A]]): Par[List[A]] = n match {
      case m if m < 0 => res
      case _ => loop(n - 1, map2(li(n), res)(_ :: _))
    }
    loop(li.length - 1, unit(Nil))
  }

练习 7.6
实现parFilter,并过滤列表元素

  def parFilter[A](li: List[A])(f: A => Boolean): Par[List[A]] = {
    def loop(n: Int, res: List[Par[A]]): List[Par[A]] = n match {
      case m if m < 0 => res
      case _ => 
        if (f(li(n))) loop(n - 1, lazyUnit(li(n)) :: res)
        else loop(n - 1, res)
    }
    sequence(loop(li.length - 1, Nil))
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • //Clojure入门教程: Clojure – Functional Programming for the J...
    葡萄喃喃呓语阅读 3,658评论 0 7
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,623评论 18 399
  • 我正需要一场鹅毛般的大雪, 让它来遮盖昨日的痕迹; 列车已经离开了这个站点 落满雪花的身影又该走向哪里呢? 过去总...
    梁金欠阅读 218评论 0 1
  • 家里有了蜗牛就是不一样。 宝爸询问儿子,“爸爸新下了《极速蜗牛》,看不看?”在房里东奔西跑的儿子立刻答应了。 宝爸...
    铅笔芒种阅读 223评论 0 1