本篇接着上篇函数响应式编程思想 & RxSwift 核心逻辑(一)继续详细分析上篇没有提到的地方。
序列继承链
let ob = Observable<Any>.create
这句代码创建的是类AnonymousObservable
匿名可观察序列, 它的父类和层级关系如下:
类 AnonymousObservable --> 类 Producer --> 类 Observable --> 协议 ObservableType --> 协议 ObservableConvertibleType
先从上往下说,比较好理解一点。
协议ObservableConvertibleType
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<E>
}
看注释可以明白,使用了关联类型提供了一个可转换为可观察序列Observable的方法,这个方法在遵守这个协议的类中实现即可。
这是最底层的协议,即可满足“万物皆序列”的目的。例如:
UISwitch().rx.value.asObservable();
因为value是结构体ControlProperty类型的,而ControlProperty底层又遵守了ObservableConvertibleType协议,所以最后value可以被转换为一个可观察序列。
协议ObservableType
顾名思义,这是一个可观察序列协议,于是目前提供了一个每个可观察序列都一定会有的订阅方法:
- returns: Subscription for `observer` that can be used to cancel production of sequence elements and free resources.
*/
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
类Observable
遵守ObservableType协议的可观察序列,这是所有序列的基类。所有这里面实现的方法,都是所有序列都必须要实现的基本方法。
public class Observable<Element> : ObservableType { // Observable 可观察序列,所有序列的基类
/// Type of elements in sequence.
public typealias E = Element // 起别名
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal() // 初始化的时候,类似于 引用计数 +1
#endif
}
// 协议的实现
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod() // 抽象方法,父类不实现,子类去实现
}
public func asObservable() -> Observable<E> { // 返回的是一个 Observable对象
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal() // 销毁的时候,类似于 引用计数 -1 ; 从而根据进入一个页面前,再退出一个页面后类似的操作,总共引用计数的变化来判断是否有内存泄漏问题
#endif
}
// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯ 想优化协议扩展,但是swift不支持,虽然不知道他想具体怎么优化
/// Optimizations for map operator 优化map函数
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
上面注释写的很清楚了,主要是三个功能:
- 便于内存管理,实现了当Observable初始化和销毁的时候,分别实现Resources.incrementTotal()方法和Resources.decrementTotal()方法
- 实现了遵守的协议方法subscribe抽象方法,但是并没有具体实现,交给子类去实现
- 实现了协议方法asObservable(),使其子类调用这个方法都能返回一个可观察序列Observable
类Producer
Producer继承自Observable,主要是具体实现了父类的订阅方法subscribe
,并且提供了一个run
方法,但是并没有具体实现,交给子类去实现。
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired { // CurrentThreadScheduler 调度者
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer // 销毁
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod() // 抽象方法,父类不实现,子类去实现
}
}
类AnonymousObservable
主要是保存了属性_subscribeHandler,和具体实现了父类Producer中的run方法
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable // 起个别名为SubscribeHandler
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) { // 带了一个SubscribeHandler参数的初始化
self._subscribeHandler = subscribeHandler // 保存了订阅管理者
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel) // 这里传进去了 订阅者 销毁者
let subscription = sink.run(self) // 这里的self就是 AnonymousObservable 这里传进去了 可观察序列,即 ob
return (sink: sink, subscription: subscription)
}
}
订阅者继承链和订阅发送流程
订阅者继承链
在let _ = ob.subscribe(onNext: { (text) in
这句代码里面创建了一个AnonymousObserver内部订阅者。它的继承关系如下:
类AnonymousObserver --> 类ObserverBase --> 协议Disposable,协议ObserverType
还是从上往下说,好梳理一些:
协议ObserverType
顾名思义这是一个订阅者的底层基本协议
public protocol ObserverType {
/// The type of elements in sequence that observer can observe.
associatedtype E
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<E>)
}
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element)) // 这里的on,是遵守了ObserverType协议的 AnonymousObservableSink的on方法 即管子来处理结果
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed) // 管子来处理结果
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error)) // 管子来处理结果
}
}
从上面代码中可以看出,主要是提供了一个所有订阅者都会有的事件处理on
方法。以及扩展的序列会出现的Next
,Completed
和Error
三种状态,只不过这里on
的方法是遵守这个协议的类的子类实现的on
方法。
协议Disposable
/// Represents a disposable resource.
public protocol Disposable {
/// Dispose resource.
func dispose()
}
提供了一个释放资源,即垃圾回收的方法。
类ObserverBase
顾名思义,这是所有订阅者的基类:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) { // 这里的类型必须是前面传进来的类型,因为swift是一门强类型语言
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
func onCore(_ event: Event<E>) {
rxAbstractMethod() // 抽象方法,交给子类具体实现
}
func dispose() {
fetchOr(self._isStopped, 1) // 当计数为0的时候,就回收
}
}
从上面可以看出,主要是提供了下面三个功能:
- 提供了
onCore
方法,但是并没有具体实现,交给子类具体实现 - 实现on方法,根据序列状态
next
,error
,completed
,调用onCore
方法,并将事件传过去,交由子类具体实现 - 实现了垃圾回收的方法
类AnonymousObserver
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void // 起别名
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler // 保存了事件管理者,保存的即是前面AnonymousObserver大括号内的内容
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event) //
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
从上面可以看出,主要实现的功能是:
- 在类初始化的时候,保存了_eventHandler对象
- 和Observable一样,分别实现Resources.incrementTotal()方法和Resources.decrementTotal()方法来管理内存
- 重写onCore方法
订阅和发送流程:
1、在订阅的这句let _ = ob.subscribe(onNext: { (text) in
代码中创建了订阅者observer
2、接下来,到这句
return Disposables.create(
self.asObservable().subscribe(observer), // self.asObservable() 这个即是序列ob。这句代码回到上面的 ob序列者的闭包
disposable
)
即走到了Producer
的subscribe
方法中的run
方法
3、来到了AnonymousObservable.run
方法中的sink.run
方法
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel) // 这里传进去了 订阅者 销毁者
let subscription = sink.run(self) // 这里的self就是 AnonymousObservable 这里传进去了 可观察序列,即 ob
return (sink: sink, subscription: subscription)
}
4、这时就来到了
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self)) // 这里的self是 AnonymousObservableSink
}
5、这时来到了AnyObserver的初始化方法
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on // 这个后面的observer是AnonymousObservableSink,将AnonymousObservableSink的on函数保存在了常量 self.observer 中
}
6、这时来到了创建序列时的闭包
let ob = Observable<Any>.create { (obserber) -> Disposable in // 这里的obserber是AnyObserver类型
// 3:发送信号
obserber.onNext("框架班级") //这一句
obserber.onCompleted()
// obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
return Disposables.create()
}
中onNext这句
7、这时来到了AnyObserver的on方法
public func on(_ event: Event<Element>) {
return self.observer(event) // 这里的self.observer是上面保存的AnonymousObservableSink的on函数,并将event传进去
}
8、最后来到了管道中的forwardOn方法
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event) // 这里的self._observer 即是订阅者AnonymousObserver
}
9、最后就来到了AnonymousObserver的on方法的实现中
let observer = AnonymousObserver<E> { event in
//保存了事件,这整个大括号是作为一个逃逸闭包参数,初始化传递给了AnonymousObserver。并且将其赋值给新的常量observer,保存起来
//最后传给了管子
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
走到.next方法中,最后打印“订阅到:框架班级”
总结的简单流程图如下:
总的流程:
RxSwift设计思维之我想
- 最底层的是协议,每个协议都有自己独有的方法,可以遵守多个协议,方便后期扩展
- 使用了一层一层的继承关系,最底层的基类实现所有类都要实现的基本方法,可以实现协议的方法,但是只是一个抽象方法,具体方法到某个具体的子类去实现
- 使用了AnonymousObservableSink中间层来实现可观察序列AnonymousObservable和订阅者AnonymousObserver之间的通信。类似于其他框架中的manager
- 可以在每个基类的初始化init和销毁方法中deinit实现监控内存管理的方法
- 参数起名还是尽量准确一点,很多地方observer表示的是Sink或别的,但是RxSwift还起名为observer,不便于理解