【Swift 脑洞系列】并行异步运算以及100行的`PromiseKit`

承接上一篇,轻松无痛实现异步操作串行。 如果没看过上一篇,阅读本篇可能会有点懵逼。

在上一篇文章中,我主要描述了如何实现异步串行运算符,+>。并演示了如何基于他来做一些诸如参数的传递和错误的处理等操作。

这篇文章中,我们会基于之前的发现,来实现异步并行运算符 <>。 以及基于 +><> 来做一些有趣的应用。

本文的主要内容:

  • 实现并行折叠运算符: <>
  • 基于 +><>,实现一个简洁优雅的 Promise 接口;

第一部分 能够折叠异步并行操作的运算符

什么是折叠

首先,我们需要定义什么是异步并行? 就是我们同时执行多个异步操作,当所有操作都执行完毕后,执行异步(Complete)回调。比如我们已经有了用户的 ID,需要同时请求用户的头像和基本资料。在两个请求都拿到数据时,刷新界面。

在上一篇文章中,我们在提出运算符 +> 之前,提出了一个连接的概念。指的是把两个异步操作连接起来,一个执行完就执行另一个。通过连接,把两个异步操作合并为一个。

但现在异步并行,显然不能用连接,因为多个请求是一起发生的,没有先后顺序。在本文中,用折叠来表示把多个异步请求以并行的方式合并为一个的过程。

基本分析

首先,回忆一下我们异步串行运算符的签名:


typealias AsyncFunc = (info : AnyObject,complete:(AnyObject?,NSError?)->Void) -> Void

+> : (AsyncFunc,AsyncFunc) -> AsyncFunc

我们通过实现把两个异步操作折叠为一个,来实现串行折叠任意多个异步操作。

并行的思路也是一样的,我们只要实现并行折叠两个异步操作,我们就能折叠任意多个异步操作。

我们首先写出函数的签名:

func <>(left : AsyncFunc, right : AsyncFunc) -> AsyncFunc

为什么我们选择的串行异步运算符 +> 是非对称的,而并行异步运算符 <> 却是对称的呢?这还是由串行异步和并行异步两个运算的性质决定的,串行异步不满足交换律,因为串行就代表了运算本身有先后。而并行却没这个限制。a <> b == b <> a ,但 a +> b != b +> a

按照惯例,我们先根据函数的签名(返回一个函数),撸个基本的架子:

func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
    return { info, complete in
        
    }
}

架子搭好以后,我们来思考一下如何实现函数体, 有以下几个方面

这里的函数体,是指我们 return 后面的函数的函数体,而不是 <> 的函数体,如果一味思考后者,很容易懵逼。函数式编程的一个关键技巧就是通过类型来拆分抽象层次,局部具体,总体抽象。

  • 主体逻辑
    既然我们的 <> 是用来把两个异步操作并行折叠成一个,所以我们返回的函数体要实现的功能就是同时执行 leftright 这两个函数,当两个函数都执行完毕后(两者都调用了自己的 complete 闭包),再调用最外层的 complete 闭包,也就是我们返回的函数签名的第二个参数
  • 参数传递
    最外层的参数 info, 代表总的输入参数。需要分别在调用 leftright 时传给它们。那如何表达并行折叠后的异步调用的结果呢?我们知道 leftright 作为类型为 AsyncFunc 的异步函数,在它们调用自己的 complete 闭包时都会带上自己的结果。其中一种可选的方式就是把 leftright 的结果通过数组合并,当做折叠后的异步的结果。

实现异步折叠运算符

基于以上的分析,我们大概可以给出如下的实现:


func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
    return { info, complete in
        
        var leftComplete = false
        var rightComplete = false
        
        var leftResult:AnyObject? = nil
        var rightResult:AnyObject? = nil
        
        let checkComplete = {
            if leftComplete && rightComplete{
                let finalResult:[AnyObject] = [leftResult!, rightResult!]
                complete(finalResult, nil)
            }
        }
        
        left(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }
            
            leftComplete = true
            leftResult = result;
            checkComplete()
        }
        
        right(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }
            
            rightComplete = true
            rightResult = result;
            checkComplete()
        }
    }
}

上面的代码逻辑其实很简单,我们通过一个 checkComplete 函数来检查两个任务是否都已经完成,如果完成则合并两个异步函数返回的结果,并调用最外层的 complete 闭包。 两个异步函数则直接调用,在 complete 闭包中检查是否出错,没有则保存相应的结果,和置对应的标志位。

测试一下

        let delay = dispatch_time(DISPATCH_TIME_NOW, Int64(NSEC_PER_SEC))
        
        let test1:AsyncFunc = { _,complete in
            print("test1")
            
            dispatch_after(delay, dispatch_get_main_queue(), {
                complete(0,nil);
            })
        }
        
        let test2:AsyncFunc = { _,complete in
            print("test2")
            
            dispatch_after(delay, dispatch_get_main_queue(), {
                complete(0,nil);
            })
        }
        
        let test = test1 <> test2;
        
        test(info: 0){ _,_ in print("all finished")};

上述代码中,我们创建了两个异步操作:test1test2。 然后通过我们的并行折叠运算符 <> 折叠为一个: test。之后直接运行 test。

结果输出:

test1
test2
all finished

我们运行折叠后的函数,test1test2 都得到了调用,并且在都完成之后,调用了最外层的 complete 闭包:打印出了 all finished。看上去很完美。

精益求精

但是真的完美了吗?

在上述测试代码中,我们把 main_queue 换成 global_queue之后,我们会发现最外层的 complete 闭包被执行了两次,最终打印了两次 all finished, 这明显不是我们想要的结果。

上面的代码其实会有一个经典的多线程问题,如果 leftrightcomplete 闭包是并发调用的话,就有可能在执行完 leftComplete = true 的时候执行被切走,执行 rightcomplete 闭包,执行完 right 之后继续 left 这边的执行。这个时序就会导致最终被执行两次。

解决也很简单,我们只要加一个变量来当做互斥锁即可,最终的并行折叠运算符修改如下:

func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
    return { info, complete in
        
        var leftComplete = false
        var rightComplete = false
        var finishedComplete = false
        
        var leftResult:AnyObject? = nil
        var rightResult:AnyObject? = nil
        
        let checkComplete = {
            if leftComplete && rightComplete{
                objc_sync_enter(finishedComplete)
                if !finishedComplete{
                    let finalResult:[AnyObject] = [leftResult!, rightResult!]
                    complete(finalResult, nil)
                    finishedComplete = true
                }
                objc_sync_exit(finishedComplete)
            }
        }
        
        left(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }

            leftComplete = true
            leftResult = result;
            checkComplete()
        }
        
        right(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }

            rightComplete = true
            rightResult = result;
            checkComplete()
        }
    }
}

至此,我们拥有了一个优雅的并行折叠运算符:<>, 和 +> 一样。可以帮助我们简化代码,抽象逻辑。 当然,闲的蛋疼要对其玩一玩map/filter/reduce之类也是支持的,和上篇介绍的思路一样。在此不再赘述。

第二部分,100行实现类 PromiseKit 的接口

镜头切换到一些实际应用的场景,很多时候我们倾向于通过 closure 来组织逻辑,这样可以把本身就耦合的逻辑写在一个地方,也更容易维护。我们的并行折叠和串行连接运算符都是基于函数的,能不能应用在 closure based scenario 呢? let try it.

考虑接口易用性,我们 API 的设计可以直接参(shan)考(zhai) PromiseKit.

PromiseKit GitHub主页的 Readme 给了这样的一个例子:

firstly {
    when(NSURLSession.GET(url).asImage(), CLLocationManager.promise())
}.then { image, location -> Void in
    self.imageView.image = image
    self.label.text = "\(location)"
}.always {
    UIApplication.sharedApplication().networkActivityIndicatorVisible = false
}.error { error in
    UIAlertView(/*…*/).show()
}

我们来分析一下他都做了什么:

仅从 API 字面分析,本文不涉及 PromiseKit 内部真正的实现机制

  • 通过 firstly 注册第一个任务,并返回一个 Promise 对象。用于后面的链式代码书写。
  • when 函数接受两个同步的任务,同时触发两个任务并阻塞当前的执行,直到两个任务都完成。(异步并行的场景),这里虽然 when 会阻塞执行,但 when 本身是运行在主线程中的,也不会阻塞主线程。
  • then 可以有任意多个,顺序执行。then 块中直接用同步的方式写代码。但最终这些任务都会被异步的执行。(异步串行的场景
  • 不管执行过程中是否出错,都会执行 always
  • 如果执行过程中出错,则执行 error 块。

基于以上的分析,我们一步步来实现这几个组件:

firstly

firstly用于接收第一个任务,任务书写是同步的方式,但必须异步运行。

func firstly(body : Void->Void)->Promise{
    let starter: AsyncFunc = { _,complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
            body();
            complete(0,nil);
        }
    }

    return Promise(starter: starter)
}

我们的 firstly 实现只做了两件事, 把第一个任务包成异步的,并用这个任务创建了一个 Promise 对象并返回。

因为所有任务最终都是由 Promise 对象来维护的,所以 firstly 只需要把第一个任务直接给他即可。

Promise 类基础

根据之前的分析,我们先把显而易见的架子撸出来:

class Promise {
    var chain : AsyncFunc
    var alwaysClosure : (Void->Void)?
    var errorClosure : (NSError?->Void)?
    
    init(starter : AsyncFunc){
        chain = starter
    }
    
    func then(body : AnyObject throws->Void )->Promise{
        //TO BE IMP
        return self
    }
    
    func always(closure : Void->Void)->Promise{
        alwaysClosure = closure
        return self
    }
    
    func error(closure : NSError?->Void)->Promise{
        errorClosure = closure
        fire()
        return self
    }
    
    func fire(){
        chain(info: 0) { (info, error) in
            if let always = self.alwaysClosure{
                always()
            }
            
            if error == nil{
                print("all task finished")
            }else{
                if let errorC = self.errorClosure{
                    errorC(error)
                }
            }
        }
    }
}

上述代码实现了除 then 函数之外的所有部件。我们把初始任务存在成员 chain 上面,然后分别用成员保存 error closurealways closure, 然后在注册完 error closure 之后调用 fire 来触发 chain 的执行,在 chain 执行完毕后分别执行 always 和是否出错来执行 error.

then, always, error 都返回 self, 实现链式调用。

至此,我们已经实现了能执行一个任务,并且实现 alwayserror 机制的 Promise 对象。

无限的、链式 then 块。

如之前所说,我们把 firstly 传进来的初始任务保存在 chain 这个成员中。那之后的 then 传入的其实就是后续的任务,比如有三个链式的 then,就代表我们需要串行的执行四个任务:初始任务,三个 then块的任务。

所以,我们的 then 函数可以这样来实现:

        let async: AsyncFunc = { info, complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
                var error : NSError?
                do{
                    try body(info)
                }catch let err as NSError{
                    error = err
                }
                complete(0,error)
            }
        }
        chain = chain +> async
        return self
    }

显而易见, then 做的事情和 firstly并无太多区别,首先把传进来的同步任务打包成异步,第二步是把新的任务通过异步串行运算符 +> 合并到成员 chain 中。这样,chain 保存的就不仅仅是初始任务,而是像一个累加器一样,有多少 then, chain就是最终合并的任务。这样,我们不管 then 多少次,每个 then 块中的任务都会被合并到 chain 里。最终我们只需要执行 chain, 即可触发所有任务的链式执行(因为合并用的是 +>)。

注意在 then 块中执行 body 的时候用了 do-catch 结构,目的就是在 then 块接受的任务可以通过 throw 抛出错误,然后在这里捕获,实现错误的感知(如果捕获到错误,则最终会调用 errorClosure

实现 when 函数

我们温习一下上文对 when 函数的分析:

when 函数接受两个同步的任务,同时触发两个任务并阻塞当前的执行,直到两个任务都完成。(异步并行的场景),这里虽然 when 会阻塞执行,但 when 本身是运行在主线程中的,也不会阻塞主线程。

根据 when 函数的定位,只要简单实现成独立的函数即可,不需要实现为 Promise类的成员。

func when(fstBody : (Void->Void), sndBody : (Void->Void)){
    let async1 : AsyncFunc = { _ , complete in
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
            fstBody();
            complete(0,nil);
        }
    }
    
    let async2 : AsyncFunc = { _ , complete in
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
            sndBody();
            complete(0,nil);
        }
    }
    
    let async = async1 <> async2
    
    var finished = false
    
    async(info: 0) { (_, _) in
        finished = true
    }
    
    while finished == false {
        
    }
}

上述代码中,when 首先把传入的两个同步任务打包成一部,并通过异步并行运算符 <> 合并,然后直接执行合并后的结果。合并后的结果回调时(也就是两个任务都完成时),置 finishedtrue。 末尾用一个 whilefinishedfalse 时阻塞函数的执行。

至此,我们完成了一个最简单的 Promise 的封装, firstlyPromise 主类when 三个组件,加起来一共100行

老规矩,来测试一下

        firstly { () in
            when({ () in
                print(“begin fst job")
                sleep(1)
                print("fst job in when finished")
                }, sndBody: { () in
                    print(“begin snd job")
                    sleep(5)
                    print("snd job in when finished")
            })
        }.then { (info) in
            print("second job")
        }.then { (info) in
            print("third job")
        }.always { () in
            print("always block")
        }.error { (error) in
            print("error occurred")
        }

执行流程:同时执行 when 的两个任务,都完成之后按顺序执行 then, 最后执行 always。因为过程中没有error,所以 error 块没有被调用。

begin fst job
begin snd job
(间隔1秒)fst job in when finished
(间隔4秒)
snd job in when finished
second job
third job
always block

现在来简单修改一下代码,在 second job 里抛出一个 error:

        firstly { () in
            when({ () in
                print(“begin fst job")
                sleep(1)
                print("fst job in when finished")
                }, sndBody: { () in
                    print(“begin snd job")
                    sleep(5)
                    print("snd job in when finished")
            })
        }.then { (info) in
            print("second job")
            throw NSError(domain: "error", code: 0, userInfo: [:])
        }.then { (info) in
            print("third job")
        }.always { () in
            print("always block")
        }.error { (error) in
            print("error occurred")
        }

最终输出:

begin fst job
begin snd job
(间隔1秒)fst job in when finished
(间隔4秒)
snd job in when finished
second job
always block
error occurred

对比之前的结果,因为抛出了错误,所以 error 块得以执行,并且thrid job 没有执行,因为出错中断了 then 链的执行。

总结

  • 上一篇文章中,我们实现了异步串行运算符: +>
  • 本篇文章中,我们首先实现了异步并行运算符: <>
  • 然后,基于上面两个运算符,我们用100行实现了一个简单的 Promise 实现;

本文所有代码: https://github.com/aaaron7/functional_async_demo

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容