(一)前言
前两篇文章分析了RxSwift
的整个基础的订阅流程以及变换操作(Operators)的概念实现,有兴趣的读者可以点击以下链接。
【领略RxSwift源码】- 订阅的工作流(Subscribing)
【领略RxSwift源码】- 变换操作(Operators)
本篇文章将阐述Subject
的概念以及在RxSwift
当中的具体实现,在分析源码的过程中,我们或许会发现一个不一样的世界,或许我们会看到平时看不到的风景。
(二)SubjectType
在ReactiveX
的世界中,一共定义了4种不同的Subject
,分别是AsyncSubject
、BehaviorSubject
、PublishSubject
和ReplaySubject
。无一例外,这四种Subject
都实现了SubjectType
协议,当然这也是非常朴素的面向协议了😂。
我们来看看SubjectType
协议:
/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
/// The type of the observer that represents this subject.
///
/// Usually this type is type of subject itself, but it doesn't have to be.
associatedtype SubjectObserverType : ObserverType
/// Returns observer interface for subject.
///
/// - returns: Observer interface for subject.
func asObserver() -> SubjectObserverType
}
我们可以看到,在SubjectType
中定义了一个ObserverType
类型的associatedtype
以及一个func asObserver() -> SubjectObserverType
的方法。于此同时,它也是继承自Observable
。也就是说,一个SubjectType
既是一个观察者Observer
,又是一个可观察序列(Observable)。
(三)Subject的实现细节
AsyncSubject
问:AsyncSubject 是一个具有什么样特性的Subject?
答:简单的来说,当AsyncSubject被订阅的时候,如果AsyncSubject已经发送过了
.complete
事件,那么订阅者只能得到最后一个序列的值(如果没有发送过序列那么不触发任何订阅)。如果没有发送.complete
事件,那么订阅者一直都不会订阅到值,直到AsyncSubject
发送了.complete
事件。
我们可以先来看一看AsyncSubject
的继承和协议:
public final class AsyncSubject<Element>
: Observable<Element>
, SubjectType
, ObserverType
, SynchronizedUnsubscribeType {
...
SubjectType
刚刚我们已经看过它的定义了,而ObserverType
也在之前的文章中有过认识,那么只剩下SynchronizedUnsubscribeType
是还没有见到过的一个协议,看它的名字貌似是“同步取消订阅者”的一个协议,我们来看一下具体的定义:
protocol SynchronizedUnsubscribeType : class {
associatedtype DisposeKey
func synchronizedUnsubscribe(_ disposeKey: DisposeKey)
}
emmm...看样子是定义一个DisposeKey,然后可以通过这个DisposeKey来同步取消订阅。而这个DisposeKey其实就是一个BagKey
的结构体,这个结构体只有一个UInt64
类型的存储属性rawValue
,如下:
struct BagKey {
fileprivate let rawValue: UInt64
}
那么既然是移除所有的订阅者,那么这些订阅者被存储在哪里呢?
在RxSwift中定义了一个数据结构叫做Bag
,它是一个用来存储少量元素的高效容器,它的插入删除时间复杂度为O(n)。
struct Bag<T> : CustomDebugStringConvertible { ... }
值得一提的是,在Bag
的内部,真正存储元素的容器并不是我们常用的Array
类型,而是使用了ContiguousArray
。我们可以看一下ContiguousArray
的官方解释:
/// If your array's `Element` type is a class or `@objc` protocol and you do
/// not need to bridge the array to `NSArray` or pass the array to Objective-C
/// APIs, using `ContiguousArray` may be more efficient and have more
/// predictable performance than `Array`. If the array's `Element` type is a
/// struct or enumeration, `Array` and `ContiguousArray` should have similar
/// efficiency.
显然,使用ContiguousArray
这是因为ContiguousArray
在处理class
或者@objc
修饰的类型的时候更加的高效,而在处理Swift基础类型的时候效率就和Array
差不多了。
注:还有值得一提的是
&=
操作符,这是一个日常开发中很少使用到的一个操作符。与普通的加法操作符(+
)的区别在于,当加法操作完成之后的结果类型溢出之后,任然可以安全的使用不至于奔溃;例如:
let val: Int8 = 64 val + 64 // output: error val &+ 64 // output: -128
OK~ 我们认识到了在AsyncSubject
中使用Box
来存储,那么具体的实现AsyncSubject
的逻辑是怎么样的呢?我们先来看一下AsyncSubject
有着那些属性:
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
_lock.lock(); defer { _lock.unlock() }
return _observers.count > 0
}
let _lock = RecursiveLock()
// state
private var _observers = Observers()
private var _isStopped = false
private var _stoppedEvent = nil as Event<Element>? {
didSet {
_isStopped = _stoppedEvent != nil
}
}
private var _lastElement: Element?
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
我们可以看到,重点的实现相关逻辑的属性都被标注成了private
。_observers
是一个存储元素类型为Event<Element>) -> ()
的Box
,_isStopped
是一个Bool
类型的flag,一旦发送了complete
或者error
时间,那么这个flag
就会置为true
。而_stoppedEvent
则是一个可选的Event<Element>
类型,它永远是最新发送的一个事件,如果从来没有发送next
事件,那么这个属性就永远为空。
由于Subject
既有Observer
的特性又有Observable
的特性,所以我们一个一个看它如何实现这些特性。我们先来看看Observer
:
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<E>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
let (observers, event) = _synchronized_on(event)
switch event {
case .next:
dispatch(observers, event)
dispatch(observers, .completed)
case .completed:
dispatch(observers, event)
case .error:
dispatch(observers, event)
}
}
在这里有两个比较关键的函数,一个是_synchronized_on
,另一个是dispatch
。_synchronized_on
是实现AsyncSubject
的关键函数,我们可以待会了解了其他细节之后再看。
dispatch
是一个在Bag+Rx.swift
中定义的一个内联函数,它的主要作用是给bag
内的所有(Event<E>) -> ()
闭包对象派发执行一个指定的事件(Event)。源码如下:
@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
if bag._onlyFastPath {
bag._value0?(event)
return
}
let value0 = bag._value0
let dictionary = bag._dictionary
if let value0 = value0 {
value0(event)
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = dictionary {
for element in dictionary.values {
element(event)
}
}
}
注:在Swift中我们可以通过@ inline关键字来标识一个函数是内联函数。简单的来说,在Swift中我们有三种"内联策略": sometimes, nerver, always。
sometimes: 当我们申明一个函数的时候,默认这个函数的内联策略就是sometimes的。这个时候,swift的编译器会自动的为它所认为足够短小的函数增添上内联特性,而对于相对而言比较庞大的函数不使用内联特性,以此达到代码执行优化的目的。
always: 当我们需要某个函数强制内联的时候,我们只需要在函数之前加上@inline(__always)
关键字。当编译器检测到该关键字的时候,编译器就知道在这里永远都需要内联展开,就不会执行自己的那一套默认的内链优化策略。
nerver: 当我们需要某一个函数永远都不要进行内联的时候,我们只需要在函数之前加上@inline(never)
。那么,当编译器检测到该关键字的时候,编译器就知道在这里永远都不需要内联展开。
正如我们看到的,dsipatch
方法是基于Bag<(Event<E>) -> ()>
类型的容器来实现的,之所以之前有一堆复杂的判断逻辑,就是因为优化代码执行效率。当bag
中只有一个元素的时候,_onlyFastPath
为true
,那么我们只需要取出那一个执行操作就可以了。然而当我们超过一个,小于三十个的时候,我们会将元素存储在ContiguousArray
中,通过下标的方式来获取元素进行操作。而当容器内元素超过30这个阈值的时候,我们就要将元素插入到字典中,需要使用的时候再用键值对取出使用。
那么,_synchronized_on
到底是如何配合dispatch
来实现AsyncSubject
的特性的呢?
func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
_lock.lock(); defer { _lock.unlock() }
if _isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
_lastElement = element
return (Observers(), .completed)
case .error:
_stoppedEvent = event
let observers = _observers
_observers.removeAll()
return (observers, event)
case .completed:
let observers = _observers
_observers.removeAll()
if let lastElement = _lastElement {
_stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
_stoppedEvent = event
return (observers, .completed)
}
}
}
当源序列发送next
事件的时候,AsyncSubject
仅仅使用内部的_lastElement
属性来记录下当前的next
事件,然后构造一个空的Bag来执行completed
事件(相当于没做什么事情)。
当源序列发送error
事件的时候,使用_stoppedEvent
来记录最后的最后的事件,然后构造一个observers
常量,将自身所有的观察者拷贝到observers
常量中,将自身所有的观察者移除,最后把observers
和该error
事件返回。
(四)Subject的意义
当然,除了AsyncSubject
之外,我们还有还有以下几种Subject
:
- PublishSubject: 标准的热信号,订阅者只会接收到订阅操作之后的事件。
- ReplaySubject:订阅者会接受到订阅之前的事件以及订阅之后的事件,类似于冷信号。
- BehaviorSubject:订阅之后首先会接收到最近一次发送的事件(如果最近没有发送,那么发送一个初始的事件)。
- Variable: 基于
BehaviorSubject
的封装,会将初始值或者最近的值发送给订阅者。
然而写到这里,我并不想一一详细的分析剩下四种的实现细节。因为,与刚刚分析完成的AsyncSubject
相比,其余的Subject
实现的方法都没有太大的区别。所以笔者也不想在这里流水账似的浪费时间。
不如做一些更有意思的事情:为什么我们需要Subject?
现在我们先不妨设想一个这样的场景:
我们需要追踪用户在iPhone上的每一次点击,当用户点击一次系统就会调用一次screenDidTapped(on point: CGPoint)方法。
那么在ReactiveX中,我们自然可以想到类似这样的做法:
var observer: AnyObserver<CGPoint>!
let tapped = Observable<CGPoint>.create { (observer) -> Disposable in
observer = observer
return Disposables.create()
}
func screenDidTapped(on point: CGPoint) {
observer.on(point)
}
然而这样的实现确实存在一些问题:
- 一对一的限制
由于Observables
的特性限制,如果我们希望有多个观察者来订阅该点击事件,那么Observables
是无法做到的。当你存在两个及以上的订阅的时候,只有最新的观察者可以接收到序列的事件信息。
- 订阅前的行为
还是由于Observables
的特性,create
构造器的闭包只会在第一次被订阅的时候会调用。然而当点击屏幕的时候,我们并不能保证就一定有观察者订阅了这个序列。
也就是说,当你遇到类似上述的情况的时候,你需要使用热信号(hot observeable
)。
Hot Observables VS Cold observables
虽然冷热信号已经是被讲烂的话题了,但是既然写到这里已经是不得不说的地步了。
Bnaya Eshet
在他的博客中对"冷热信号"有过一个非常形象的比喻:
if a tree falls in a forest and no one is around to hear it, does it make a sound? if it do make a sound when nobody observed it, we should mark it as hot, otherwise it should be marked as cold.
倘若一颗沙漠中的枯树黯然倾倒而无人问津,是不是可以说它从未对这个世界发出声音。倘若无人关心而算作发出了声音,那么它就是热信号,反之,则是冷信号。
我们再来看看冷热信号的对比:
Hot Observables | Cold observables |
---|---|
属于序列 | 属于序列 |
无论有或者没有被订阅,都会产生事件。 | 只有被订阅的时候才会产生事件。 |
变量、属性、点击操作、鼠标操作、UI的变化等 | 异步操作、HTTP连接、TCP连接等 |
通常包含N个Next事件 | 通常只有一个Next事件 |
数据源的变化能够作用到所有的订阅者。 | 数据源的变化只会作用到当前的订阅者。 |
它是有状态的 | 它是无状态的 |
在现实世界的编程中,我们总是面对着各种各样复杂的情景。绝大多数的情况之下,我们的信号流可以是纯函数的,不可变的,安全的。然而当我们面对诸如鼠标追踪,变量的流式表达的时候,不可避免的我们需要使用到热信号。
当我们需要使用到热信号的时候,我们再根据当前的环境选择最适合的Subject
。比如我们希望对数据有“回看”功能,那么我们就可以选择使用ReplaySubject
。如果我们只关心最后一个数据变化,那么我们可以使用AsyncSubject
,诸如此类等等。
(五)结语
对于Rx的使用者来说,我们更加希望使用的是Cold observables
。从函数式的角度来说,Cold observables
是不可变的,而Hot observables
是可变的。不可变的数据总是更加的符合人的心智模型,而更加易于维护和理解,同时也更加的安全。希望这篇文章可以加深读者对于Rx的理解。