RxSwift 原生的 flatmap 和 map 其实就可以达到 then 的效果, 但是毕竟用法上有些许区别, 为了熟悉 RxSwift , 我自己写了一个小小的 extension 实现 then catch 和 when 等 Promise 概念中的方法, 让我对 Rx 有了更深一步的认识和理解
import Foundation
import RxSwift
protocol RxResultType {
associatedtype E
}
enum RxResult<T>: RxResultType {
typealias E = T
case fulfilled(T)
case rejected(Error)
}
extension Observable {
static func when<C: Collection>(resolved collection: C) -> Observable<[RxResult<Element>]>
where C.Iterator.Element: ObservableType, C.Iterator.Element.E == Element {
let transformCollection: [Observable<RxResult<Element>>] = collection.map({
$0.map({ (element) -> RxResult<Element> in
return RxResult.fulfilled(element)
}).catchError({ (error) -> Observable<RxResult<Element>> in
return .just(RxResult<Element>.rejected(error))
})
})
return Observable<RxResult<Element>>.combineLatest(transformCollection)
}
func then<R>(execute body: @escaping (Element) throws -> Observable<R>) -> Observable<R> {
let subject = ReplaySubject<Observable<R>>.create(bufferSize: 1)
_ = self.do(onNext: { (element) in
do {
subject.onNext(try body(element))
} catch let err {
subject.onError(err)
}
}, onError: { (error) in
subject.onError(error)
}).subscribe()
let resultSubject = ReplaySubject<R>.create(bufferSize: 1)
_ = subject.subscribe(onNext: { (observable) in
_ = observable.subscribe(onNext: { (result) in
resultSubject.onNext(result)
}, onError: { (error) in
resultSubject.onError(error)
})
}, onError: { (error) in
resultSubject.onError(error)
})
return resultSubject
}
func then<R>(execute body: @escaping (Element) throws -> R) -> Observable<R> {
let subject = ReplaySubject<Observable<R>>.create(bufferSize: 1)
_ = self.do(onNext: { (element) in
do {
subject.onNext(.just(try body(element)))
} catch let err {
subject.onError(err)
}
}, onError: { (error) in
subject.onError(error)
}).subscribe()
let resultSubject = ReplaySubject<R>.create(bufferSize: 1)
_ = subject.subscribe(onNext: { (observable) in
_ = observable.subscribe(onNext: { (result) in
resultSubject.onNext(result)
}, onError: { (error) in
resultSubject.onError(error)
})
}, onError: { (error) in
resultSubject.onError(error)
})
return resultSubject
}
func then(execute body: @escaping (Element) throws -> Void) -> Observable<Void> {
let subject = ReplaySubject<Observable<Void>>.create(bufferSize: 1)
_ = self.do(onNext: { (element) in
do {
try body(element)
subject.onNext(Observable<Void>.just())
} catch let err {
subject.onError(err)
}
}, onError: { (error) in
subject.onError(error)
}).subscribe()
let resultSubject = ReplaySubject<Void>.create(bufferSize: 1)
_ = subject.subscribe(onNext: { (observable) in
_ = observable.subscribe(onNext: { (result) in
resultSubject.onNext(result)
}, onError: { (error) in
resultSubject.onError(error)
})
}, onError: { (error) in
resultSubject.onError(error)
})
return resultSubject
}
func `catch`(body: @escaping (Error) -> Void) -> Observable<Element> {
let subject = ReplaySubject<Observable<Element>>.create(bufferSize: 1)
_ = self.subscribe(onNext: { (element) in
subject.onNext(Observable.just(element))
}, onError: { (error) in
body(error)
subject.onError(error)
})
let resultSubject = ReplaySubject<Element>.create(bufferSize: 1)
_ = subject.subscribe(onNext: { (observable) in
_ = observable.subscribe(onNext: { (result) in
resultSubject.onNext(result)
}, onError: { (error) in
resultSubject.onError(error)
})
}, onError: { (error) in
resultSubject.onError(error)
})
return resultSubject
}
}