【领略RxSwift源码】- 主题类族(Subject)

(一)前言

前两篇文章分析了RxSwift的整个基础的订阅流程以及变换操作(Operators)的概念实现,有兴趣的读者可以点击以下链接。

【领略RxSwift源码】- 订阅的工作流(Subscribing)
【领略RxSwift源码】- 变换操作(Operators)

本篇文章将阐述Subject的概念以及在RxSwift当中的具体实现,在分析源码的过程中,我们或许会发现一个不一样的世界,或许我们会看到平时看不到的风景。

(二)SubjectType

ReactiveX的世界中,一共定义了4种不同的Subject,分别是AsyncSubjectBehaviorSubjectPublishSubjectReplaySubject。无一例外,这四种Subject都实现了SubjectType协议,当然这也是非常朴素的面向协议了😂。

我们来看看SubjectType协议:

/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
    /// The type of the observer that represents this subject.
    ///
    /// Usually this type is type of subject itself, but it doesn't have to be.
    associatedtype SubjectObserverType : ObserverType

    /// Returns observer interface for subject.
    ///
    /// - returns: Observer interface for subject.
    func asObserver() -> SubjectObserverType
    
}

我们可以看到,在SubjectType中定义了一个ObserverType类型的associatedtype以及一个func asObserver() -> SubjectObserverType的方法。于此同时,它也是继承自Observable。也就是说,一个SubjectType既是一个观察者Observer,又是一个可观察序列(Observable)。

(三)Subject的实现细节

AsyncSubject

问:AsyncSubject 是一个具有什么样特性的Subject?

答:简单的来说,当AsyncSubject被订阅的时候,如果AsyncSubject已经发送过了.complete事件,那么订阅者只能得到最后一个序列的值(如果没有发送过序列那么不触发任何订阅)。如果没有发送.complete事件,那么订阅者一直都不会订阅到值,直到AsyncSubject发送了.complete事件。

我们可以先来看一看AsyncSubject的继承和协议:

public final class AsyncSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType {
...

SubjectType刚刚我们已经看过它的定义了,而ObserverType也在之前的文章中有过认识,那么只剩下SynchronizedUnsubscribeType是还没有见到过的一个协议,看它的名字貌似是“同步取消订阅者”的一个协议,我们来看一下具体的定义:

protocol SynchronizedUnsubscribeType : class {
    associatedtype DisposeKey

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey)
}

emmm...看样子是定义一个DisposeKey,然后可以通过这个DisposeKey来同步取消订阅。而这个DisposeKey其实就是一个BagKey的结构体,这个结构体只有一个UInt64类型的存储属性rawValue,如下:

struct BagKey {
    fileprivate let rawValue: UInt64
}

那么既然是移除所有的订阅者,那么这些订阅者被存储在哪里呢?

在RxSwift中定义了一个数据结构叫做Bag,它是一个用来存储少量元素的高效容器,它的插入删除时间复杂度为O(n)。

struct Bag<T> : CustomDebugStringConvertible { ... }

值得一提的是,在Bag的内部,真正存储元素的容器并不是我们常用的Array类型,而是使用了ContiguousArray。我们可以看一下ContiguousArray的官方解释:

/// If your array's `Element` type is a class or `@objc` protocol and you do
/// not need to bridge the array to `NSArray` or pass the array to Objective-C
/// APIs, using `ContiguousArray` may be more efficient and have more
/// predictable performance than `Array`. If the array's `Element` type is a
/// struct or enumeration, `Array` and `ContiguousArray` should have similar
/// efficiency.

显然,使用ContiguousArray这是因为ContiguousArray在处理class或者@objc修饰的类型的时候更加的高效,而在处理Swift基础类型的时候效率就和Array差不多了。

注:还有值得一提的是&=操作符,这是一个日常开发中很少使用到的一个操作符。与普通的加法操作符(+)的区别在于,当加法操作完成之后的结果类型溢出之后,任然可以安全的使用不至于奔溃;

例如:

let val: Int8 = 64
val + 64 // output: error
val &+ 64 // output: -128

OK~ 我们认识到了在AsyncSubject中使用Box来存储,那么具体的实现AsyncSubject的逻辑是怎么样的呢?我们先来看一下AsyncSubject有着那些属性:

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType

    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        _lock.lock(); defer { _lock.unlock() }
        return _observers.count > 0
    }

    let _lock = RecursiveLock()

    // state
    private var _observers = Observers()
    private var _isStopped = false
    private var _stoppedEvent = nil as Event<Element>? {
        didSet {
            _isStopped = _stoppedEvent != nil
        }
    }
    private var _lastElement: Element?

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

我们可以看到,重点的实现相关逻辑的属性都被标注成了private_observers是一个存储元素类型为Event<Element>) -> ()Box_isStopped是一个Bool类型的flag,一旦发送了complete或者error时间,那么这个flag就会置为true。而_stoppedEvent则是一个可选的Event<Element>类型,它永远是最新发送的一个事件,如果从来没有发送next事件,那么这个属性就永远为空。

由于Subject既有Observer的特性又有Observable的特性,所以我们一个一个看它如何实现这些特性。我们先来看看Observer

    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        let (observers, event) = _synchronized_on(event)
        switch event {
        case .next:
            dispatch(observers, event)
            dispatch(observers, .completed)
        case .completed:
            dispatch(observers, event)
        case .error:
            dispatch(observers, event)
        }
    }

在这里有两个比较关键的函数,一个是_synchronized_on,另一个是dispatch_synchronized_on是实现AsyncSubject的关键函数,我们可以待会了解了其他细节之后再看。

dispatch是一个在Bag+Rx.swift中定义的一个内联函数,它的主要作用是给bag内的所有(Event<E>) -> ()闭包对象派发执行一个指定的事件(Event)。源码如下:

@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
    if bag._onlyFastPath {
        bag._value0?(event)
        return
    }

    let value0 = bag._value0
    let dictionary = bag._dictionary

    if let value0 = value0 {
        value0(event)
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

注:在Swift中我们可以通过@ inline关键字来标识一个函数是内联函数。简单的来说,在Swift中我们有三种"内联策略": sometimes, nerver, always。
sometimes: 当我们申明一个函数的时候,默认这个函数的内联策略就是sometimes的。这个时候,swift的编译器会自动的为它所认为足够短小的函数增添上内联特性,而对于相对而言比较庞大的函数不使用内联特性,以此达到代码执行优化的目的。
always: 当我们需要某个函数强制内联的时候,我们只需要在函数之前加上@inline(__always)关键字。当编译器检测到该关键字的时候,编译器就知道在这里永远都需要内联展开,就不会执行自己的那一套默认的内链优化策略。
nerver: 当我们需要某一个函数永远都不要进行内联的时候,我们只需要在函数之前加上@inline(never)。那么,当编译器检测到该关键字的时候,编译器就知道在这里永远都需要内联展开。

正如我们看到的,dsipatch方法是基于Bag<(Event<E>) -> ()>类型的容器来实现的,之所以之前有一堆复杂的判断逻辑,就是因为优化代码执行效率。当bag中只有一个元素的时候,_onlyFastPathtrue,那么我们只需要取出那一个执行操作就可以了。然而当我们超过一个,小于三十个的时候,我们会将元素存储在ContiguousArray中,通过下标的方式来获取元素进行操作。而当容器内元素超过30这个阈值的时候,我们就要将元素插入到字典中,需要使用的时候再用键值对取出使用。

那么,_synchronized_on到底是如何配合dispatch来实现AsyncSubject的特性的呢?

    func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
        _lock.lock(); defer { _lock.unlock() }
        if _isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            _lastElement = element
            return (Observers(), .completed)
        case .error:
            _stoppedEvent = event

            let observers = _observers
            _observers.removeAll()

            return (observers, event)
        case .completed:

            let observers = _observers
            _observers.removeAll()

            if let lastElement = _lastElement {
                _stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                _stoppedEvent = event
                return (observers, .completed)
            }
        }
    }

当源序列发送next事件的时候,AsyncSubject仅仅使用内部的_lastElement属性来记录下当前的next事件,然后构造一个空的Bag来执行completed事件(相当于没做什么事情)。

当源序列发送error事件的时候,使用_stoppedEvent来记录最后的最后的事件,然后构造一个observers常量,将自身所有的观察者拷贝到observers常量中,将自身所有的观察者移除,最后把observers和该error事件返回。

(四)Subject的意义

当然,除了AsyncSubject之外,我们还有还有以下几种Subject:

  • PublishSubject: 标准的热信号,订阅者只会接收到订阅操作之后的事件。
  • ReplaySubject:订阅者会接受到订阅之前的事件以及订阅之后的事件,类似于冷信号。
  • BehaviorSubject:订阅之后首先会接收到最近一次发送的事件(如果最近没有发送,那么发送一个初始的事件)。
  • Variable: 基于BehaviorSubject的封装,会将初始值或者最近的值发送给订阅者。

然而写到这里,我并不想一一详细的分析剩下四种的实现细节。因为,与刚刚分析完成的AsyncSubject相比,其余的Subject实现的方法都没有太大的区别。所以笔者也不想在这里流水账似的浪费时间。

不如做一些更有意思的事情:为什么我们需要Subject?

现在我们先不妨设想一个这样的场景:

我们需要追踪用户在iPhone上的每一次点击,当用户点击一次系统就会调用一次screenDidTapped(on point: CGPoint)方法。

那么在ReactiveX中,我们自然可以想到类似这样的做法:

var observer: AnyObserver<CGPoint>!

let tapped = Observable<CGPoint>.create { (observer) -> Disposable in
    observer = observer
    return Disposables.create()
}

func screenDidTapped(on point: CGPoint) {
    observer.on(point)
}

然而这样的实现确实存在一些问题:

  • 一对一的限制

由于Observables的特性限制,如果我们希望有多个观察者来订阅该点击事件,那么Observables是无法做到的。当你存在两个及以上的订阅的时候,只有最新的观察者可以接收到序列的事件信息。

  • 订阅前的行为

还是由于Observables的特性,create构造器的闭包只会在第一次被订阅的时候会调用。然而当点击屏幕的时候,我们并不能保证就一定有观察者订阅了这个序列。

也就是说,当你遇到类似上述的情况的时候,你需要使用热信号(hot observeable)。

Hot Observables VS Cold observables

虽然冷热信号已经是被讲烂的话题了,但是既然写到这里已经是不得不说的地步了。

Bnaya Eshet在他的博客中对"冷热信号"有过一个非常形象的比喻:

if a tree falls in a forest and no one is around to hear it, does it make a sound? if it do make a sound when nobody observed it, we should mark it as hot, otherwise it should be marked as cold.
倘若一颗沙漠中的枯树黯然倾倒而无人问津,是不是可以说它从未对这个世界发出声音。倘若无人关心而算作发出了声音,那么它就是热信号,反之,则是冷信号。

我们再来看看冷热信号的对比:

Hot Observables Cold observables
属于序列 属于序列
无论有或者没有被订阅,都会产生事件。 只有被订阅的时候才会产生事件。
变量、属性、点击操作、鼠标操作、UI的变化等 异步操作、HTTP连接、TCP连接等
通常包含N个Next事件 通常只有一个Next事件
数据源的变化能够作用到所有的订阅者。 数据源的变化只会作用到当前的订阅者。
它是有状态的 它是无状态的

在现实世界的编程中,我们总是面对着各种各样复杂的情景。绝大多数的情况之下,我们的信号流可以是纯函数的,不可变的,安全的。然而当我们面对诸如鼠标追踪,变量的流式表达的时候,不可避免的我们需要使用到热信号。

当我们需要使用到热信号的时候,我们再根据当前的环境选择最适合的Subject。比如我们希望对数据有“回看”功能,那么我们就可以选择使用ReplaySubject。如果我们只关心最后一个数据变化,那么我们可以使用AsyncSubject,诸如此类等等。

(五)结语

对于Rx的使用者来说,我们更加希望使用的是Cold observables。从函数式的角度来说,Cold observables是不可变的,而Hot observables是可变的。不可变的数据总是更加的符合人的心智模型,而更加易于维护和理解,同时也更加的安全。希望这篇文章可以加深读者对于Rx的理解。

参考

  1. https://developer.apple.com/documentation/swift/operator_declarations
  2. http://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/subjects.html
  3. To Use Subject Or Not To Use Subject?
  4. https://github.com/ReactiveX/RxSwift/blob/master/Documentation/HotAndColdObservables.md
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容