趣味尝试:用 RxSwift 封装 PromiseKit then when catch

RxSwift 原生的 flatmap 和 map 其实就可以达到 then 的效果, 但是毕竟用法上有些许区别, 为了熟悉 RxSwift , 我自己写了一个小小的 extension 实现 then catch 和 when 等 Promise 概念中的方法, 让我对 Rx 有了更深一步的认识和理解

import Foundation
import RxSwift

protocol RxResultType {
    associatedtype E
}

enum RxResult<T>: RxResultType {

    typealias E = T

    case fulfilled(T)
    case rejected(Error)
}

extension Observable {

    static func when<C: Collection>(resolved collection: C) -> Observable<[RxResult<Element>]>
        where C.Iterator.Element: ObservableType, C.Iterator.Element.E == Element {

        let transformCollection: [Observable<RxResult<Element>>] = collection.map({
            $0.map({ (element) -> RxResult<Element> in
                return RxResult.fulfilled(element)
            }).catchError({ (error) -> Observable<RxResult<Element>> in
                return .just(RxResult<Element>.rejected(error))
            })
        })

        return Observable<RxResult<Element>>.combineLatest(transformCollection)
    }

    func then<R>(execute body: @escaping  (Element) throws -> Observable<R>) -> Observable<R> {

        let subject = ReplaySubject<Observable<R>>.create(bufferSize: 1)
        _ = self.do(onNext: { (element) in
            do {
                subject.onNext(try body(element))
            } catch let err {
                subject.onError(err)
            }
        }, onError: { (error) in
            subject.onError(error)
        }).subscribe()

        let resultSubject = ReplaySubject<R>.create(bufferSize: 1)
        _ = subject.subscribe(onNext: { (observable) in
            _ = observable.subscribe(onNext: { (result) in
                resultSubject.onNext(result)
            }, onError: { (error) in
                resultSubject.onError(error)
            })
        }, onError: { (error) in
            resultSubject.onError(error)
        })
        return resultSubject
    }

    func then<R>(execute body: @escaping (Element) throws -> R) -> Observable<R> {

        let subject = ReplaySubject<Observable<R>>.create(bufferSize: 1)

        _ = self.do(onNext: { (element) in
            do {
                subject.onNext(.just(try body(element)))
            } catch let err {
                subject.onError(err)
            }
        }, onError: { (error) in
            subject.onError(error)
        }).subscribe()

        let resultSubject = ReplaySubject<R>.create(bufferSize: 1)
        _ = subject.subscribe(onNext: { (observable) in
            _ = observable.subscribe(onNext: { (result) in
                resultSubject.onNext(result)
            }, onError: { (error) in
                resultSubject.onError(error)
            })
        }, onError: { (error) in
            resultSubject.onError(error)
        })
        return resultSubject
    }

    func then(execute body: @escaping (Element) throws -> Void) -> Observable<Void> {

        let subject = ReplaySubject<Observable<Void>>.create(bufferSize: 1)

        _ = self.do(onNext: { (element) in
            do {
                try body(element)
                subject.onNext(Observable<Void>.just())
            } catch let err {
                subject.onError(err)
            }
        }, onError: { (error) in
            subject.onError(error)
        }).subscribe()

        let resultSubject = ReplaySubject<Void>.create(bufferSize: 1)
        _ = subject.subscribe(onNext: { (observable) in
            _ = observable.subscribe(onNext: { (result) in
                resultSubject.onNext(result)
            }, onError: { (error) in
                resultSubject.onError(error)
            })
        }, onError: { (error) in
            resultSubject.onError(error)
        })
        return resultSubject
    }

    func `catch`(body: @escaping (Error) -> Void) -> Observable<Element> {

        let subject = ReplaySubject<Observable<Element>>.create(bufferSize: 1)

        _ = self.subscribe(onNext: { (element) in
            subject.onNext(Observable.just(element))
        }, onError: { (error) in
            body(error)
            subject.onError(error)
        })

        let resultSubject = ReplaySubject<Element>.create(bufferSize: 1)
        _ = subject.subscribe(onNext: { (observable) in
            _ = observable.subscribe(onNext: { (result) in
                resultSubject.onNext(result)
            }, onError: { (error) in
                resultSubject.onError(error)
            })
        }, onError: { (error) in
            resultSubject.onError(error)
        })

        return resultSubject
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 你不知道JS:异步 第三章:Promises 接上篇3-1 错误处理(Error Handling) 在异步编程中...
    purple_force阅读 1,435评论 0 2
  • 特别说明,为便于查阅,文章转自https://github.com/getify/You-Dont-Know-JS...
    杀破狼real阅读 684评论 0 3
  • 官方中文版原文链接 感谢社区中各位的大力支持,译者再次奉上一点点福利:阿里云产品券,享受所有官网优惠,并抽取幸运大...
    HetfieldJoe阅读 8,706评论 0 29
  • 静夜里的一些往事 如路灯下斑驳的落叶在冷风中 轻轻飞舞,那么美 却终被遗弃在生命长河的一个角落 不再被珍藏起来
    qyuky阅读 249评论 0 0
  • 不惑不是明白一切,而是意识到自己不可能明白一切,因此不会耽于幻想,也不会随遇而安,而是学会在知己知彼的情况下做出选...
    张志强075阅读 366评论 0 0