07. RxSwift源码解读:Single、Completable、Maybe、Driver、Signal

今天分享一下Observable的几个变种的用法,解读其源码。这几个都是可观察序列,适用于不同的场景。

Single

Single只能发出一个成功和一个失败两种信号,分别是success()和error, 是对error complete onNext信号的变换,我们看看Single的create方法的代码:

    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
        let source = Observable<Element>.create { observer in
            return subscribe { event in
                switch event {
                case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)
                case .failure(let error):
                    observer.on(.error(error))
                }
            }
        }
        return PrimitiveSequence(raw: source)
    }

Single实际是对Observable的封装,内部包含Observable,而且Single只是一个别名,真实类型是PrimitiveSequence。

public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
public typealias SingleEvent<Element> = Result<Element, Swift.Error>

Result 是Swift的自带枚举类型,包含success和failure两种case。
observer的 .next和.comleted 对应的就是.success, error对应failure,所以这个Single只能发出一个信号,特别适用于网路请求。

我们可以看看Single的真实类型PrimitiveSequence的源码:

public struct PrimitiveSequence<Trait, Element> {
    let source: Observable<Element>

    init(raw: Observable<Element>) {
        self.source = raw
    }
}

包含一个原始的序列source,它的asObservable返回的就是这个source,这样可切换到原始序列进行操作。PrimitiveSequence实际上并没有遵循ObservableType,它遵循的是ObservableConvertibleType。

Maybe和Completable

和Single类似,Maybe和Completable的真实类型也是PrimitiveSequence,区别在于:

  • Maybe可以发出succes,error,和completed事件,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。如果你遇到那种可能需要发出一个元素,又可能不需要发出时,就可以使用 Maybe。
  • Completable,只能发出一个complete事件或一个error事件。Completable 适用于那种你只关心任务是否完成,而不需要在意任务返回值的情况。它和 Observable<Void> 有点相似。

Driver和Signal

Driver和Signal也是个特征序列,它们主要是为了简化UI层的代码。
不过如果你遇到的序列具有以下特征,你也可以使用它:

  • 不会产生 error 事件
  • 一定在 MainScheduler 监听(主线程监听)
  • [共享附加作用]

这些都是驱动 UI 的序列所具有的特征。

共享附加作用的意思是,观察者共享源 Observable,并且缓存最新的 n 个元素,将这些元素直接发送给新的观察者;它使用share操作符实现此功能。

我们看看Driver的源码:

public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        source.share(replay: 1, scope: .whileConnected)
    }
}

extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy {
    /// Adds `asDriver` to `SharingSequence` with `DriverSharingStrategy`.
    public func asDriver() -> Driver<Element> {
        self.asSharedSequence()
    }
}

我们看到Driver也是个别名,真实类型是SharedSequence,DriverSharingStrategy类似一个策略表示Driver共享序列的策略是什么,这里看到Driver共享序列的策略是

source.share(replay: 1, scope: .whileConnected)

回放个数为1,scop:.whileConnected。

关于share操作符已经在之前的文章中详细讲解过,这里不再重复

当我们将序列转换为Driver时一般使用.asDriver(onErrorJustReturn: 1), 我们看看源码:

public func asDriver(onErrorJustReturn: Element) -> Driver<Element> {
        let source = self
            .asObservable()
            .observe(on:DriverSharingStrategy.scheduler)
            .catchAndReturn(onErrorJustReturn)
        return Driver(source)
    }

这里选择DriverSharingStrategy.scheduler这个调度器执行通知,scheduler方法的实现是:

{ SharingScheduler.make() }

make的实现是:

MainScheduler()

在主线程调度执行!这样保证UI事件通知是在主线程执行。接着执行.catchAndReturn(onErrorJustReturn)操作符:

public func catchAndReturn(_ element: Element)
        -> Observable<Element> {
        Catch(source: self.asObservable(), handler: { _ in Observable.just(element) })
    }

返回一个Catch类型实例,Catch也是个Observable,它继承自Producer,保存了原始source和handler,实现了run方法:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

这里会执行CatchSink的run方法:

    func run() -> Disposable {
        let d1 = SingleAssignmentDisposable()
        self.subscription.disposable = d1
        d1.setDisposable(self.parent.source.subscribe(self))

        return self.subscription
    }

这里关键代码是:self.parent.source.subscribe(self), 调用元素序列的subscribe同时传入当前对象,而当前对象即CatchSink实现了on方法:

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            self.forwardOn(event)
        case .completed:
            self.forwardOn(event)
            self.dispose()
        case .error(let error):
            do {
                let catchSequence = try self.parent.handler(error)

                let observer = CatchSinkProxy(parent: self)
                
                self.subscription.disposable = catchSequence.subscribe(observer)
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }

在之前的文章中已经反复说过,CatchSink作为observer传入subscribe,则执行subscribe时,如果发送事件时会调用该类(CatchSink)的on方法,分析下on方法实现:
当事件类型是.next和.completed时,原样转发此事件,而当事件类型是error时,则执行handler,并执行handler返回值的subscribe方法,我们知道handler 方法返回一个Observable.just(element),所以当发送error时,将error事件转换成正常onNext的事件,发送element。发送element的工作交给CatchSinkProxy代理来完成,代理方法on中的实现如下:

func on(_ event: Event<Element>) {
        self.parent.forwardOn(event)
        
        switch event {
        case .next:
            break
        case .error, .completed:
            self.parent.dispose()
        }
    }

先执行被代理对象的forwardOn,接着如果是next则不做任何事情,因为已经执行了被代理对象的forwardOn,如果遇到error则执行被代理对象的dispose(). 释放相关资源。说白了catchAndReturn的功能是处理错误事件,将其转成一个正常的onNext事件。

回到asDriver代码实现地方,上面已经讲解完了catchAndReturn,接着调用Driver(source):

    init(_ source: Observable<Element>) {
        self.source = SharingStrategy.share(source)
    }

这里调用了DriverSharingStrategy的share操作符。DriverSharingStrategy是作为范型被确定的。DriverSharingStrategy的代码上面已经给出:source.share(replay: 1, scope: .whileConnected), 缓存是1个。所以Driver的功能就是组合observe,cacheError,share:保证在主线程监听,不会产生错误事件,共享附加作用。

Driver序列一般使用drive进行订阅。它的实现在SharedSequenceConvertibleType的扩展中,而且限定了SharingStrategy == DriverSharingStrategy,所以drive只能被Driver调用,我们看看drive实现:

Driver的真实类型是SharedSequence,而SharedSequence遵循了
SharedSequenceConvertibleType,所以它能调用drive方法。

public func drive<Observer: ObserverType>(_ observers: Observer...) -> Disposable where Observer.Element == Element? {
        MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
        return self.asSharedSequence()
                   .asObservable()
                   .map { $0 as Element? }
                   .subscribe { e in
                    observers.forEach { $0.on(e) }
                   }
    }

这里先确保在主线程执行,这里是对订阅方法的包装,使用了以下操作符:

  • asSharedSequence 返回自身Driver对象
  • asObservable 返回Driver所保存的source,即原始序列
  • map 将元素转换成可选类型,drive有了两个方法实现,一个有map的一个是没有map的,当观察者的元素类型 是序列元素类型的可选类型时,则使用map先把序列的元素类型转成可选类型,否则如果一致则不需要map。
  • subscribe 调用观察者的on方法,通知所有观察者。

Signal

Signal和Driver非常类似,唯一的区别是,Driver对新观察者回放(重新发送)上一个元素,而 Signal 不会对新观察者回放上一个元素。
从代码中就能看出区别:我们找到SignalSharingStrategy结构体,它的share方法实现是:

public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        source.share(scope: .whileConnected)
    }

SignalSharingStrategy与DriverSharingStrategy类似,从上面代码可以看出Signal的共享策略是没有缓存(replay==0),这是唯一区别。
所以一般情况下状态序列我们会选用 Driver 这个类型,事件序列我们会选用 Signal 这个类型。
比如UITextField的text改变事件序列就是状态序列,它会发送text值。UIButton点击事件序列是事件序列,它没有状态,产生事件但不发送元素。

Signal一般使用emit进行订阅,作用和代码与drive基本一样,过~~~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容