RxSwift 核心实现原理

原文链接

一直以来,响应式编程都是业界讨论的热门话题之一。为了推广响应式编程,ReactiveX 社区几乎为每一种编程语言设计实现了一种对应的响应式编程框架。RxSwift 就是针对 Swift 所开发的响应式框架。

关于 RxSwift,网上有不少相关的学习资料,但绝大多数都是 RxSwift 的使用说明,鲜有文章介绍 RxSwift 背后的设计原理。通过阅读源码,查阅资料,正向设计,我逐步理解了 RxSwift 的设计思想。因此,趁热打铁,记录并总结一下我的理解。

本文所实现的 RxSwift 代码已在 Github 开源——传送门。参照源码阅读本文效果更佳。

基础

RxSwift 主要蕴含了以下几种设计思想:

  • 发布-订阅模式
  • 流编程
  • 函数式编程

下面,我们依次来进行介绍。

发布-订阅模式

发布-订阅模式 是 RxSwift 所呈现的一种最直观思想。发布-订阅模式可以分为两个角色:发布者订阅者

订阅者的主要职责是:

  • 订阅:监听并处理某个事件。其本质就是向发布者注册一个处理某个事件的闭包。
  • 取消(订阅)

发布者的主要职责是:

  • 发布:分发某个事件
image

发布-订阅模式的基本原理是:

  • 订阅者调用发布者提供的订阅方法进行订阅,从而在发布者内部注册订阅者。
  • 发布者内部会维护一个订阅者列表。
  • 当发布者发布事件时,会遍历订阅者列表,执行其中的处理方法(将事件作为参数传递给闭包,并执行)。
image

举例

在日常开发中,发布-订阅模式是一种广泛被应用的设计模式。比如:iOS 中的 NotificationCenter、Flutter 中的事件总线都是基于这种模式实现的。如下所示为 Flutter 中事件总线的一种实现方式,其中的代码逻辑基本遵循了上述所描述的发布-订阅模式的基本原理。

// 订阅者订阅内容签名
typedef void EventCallback(arg);

class EventBus {
    // 私有构造函数
    EventBus._internal();

    // 保存单例
    static EventBus _singleton = new EventBus._internal();

    // 工厂构造函数
    factory EventBus()=> _singleton;

    // 保存事件订阅者队列,key:事件名(id),value: 对应事件的订阅者队列
    var _emap = new Map<Object, List<EventCallback>>();

    // 添加订阅者,即订阅
    void on(eventName, EventCallback f) {
        if (eventName == null || f == null) return;
        _emap[eventName] ??= new List<EventCallback>();
        _emap[eventName].add(f);
    }

    // 移除订阅者,即取消
    void off(eventName, [EventCallback f]) {
        var list = _emap[eventName];
        if (eventName == null || list == null) return;
        if (f == null) {
            _emap[eventName] = null;
        } else {
            list.remove(f);
        }
    }

    // 触发事件,事件触发后该事件所有订阅者会被调用,即发布
    void emit(eventName, [arg]) {
        var list = _emap[eventName];
        if (list == null) return;
        int len = list.length - 1;
        // 反向遍历,防止订阅者在回调中移除自身带来的下标错位 
        for (var i = len; i > -1; --i) {
            // 执行注册的闭包
            list[i](arg);
        }
    }
}

// 定义一个 top-level(全局)变量,页面引入该文件后可以直接使用 bus
var bus = new EventBus();

思考:为什么 iOS 中一个监听了某个通知的类必须要在 dealloc 时要执行 removeObserver: 方法?

流编程

流(stream) 是 RxSwift 另一个重要的设计思想。Observable<T> 是 Rx 框架的基础,也被称为 可观察序列。它的作用是可以异步地产生一系列数据,即一个 Observable<T> 对象会随着时间的推移不定期地发出 event(element: T)。数据就像水流一样持续不断地在流动,顾名思义,这也被称为 流编程

关于流编程,《计算机程序的构造与解释》一书中认为 流编程是一种调用驱动的编程思想。流编程的基本思想是:一般情况下,只是部分地构造出流的结构,并将这样的部分结构传给使用流的程序。如果使用者需要访问这个流中未构造出的那个部分,那么这个流就会自动地继续构造下去,但是只做出满足当时需要的那一部分

如下所示是 RxSwift 中常见使用形式,observable 是数据源,不断地发出数据,如果水流一样,最终流向 subscribe 中的闭包。期间会流经 mapfilter 等操作符,经过转换或过滤。这就是流编程的思想。

let observable: Observable<Int> = Observable.create { (observer) -> Disposable in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onCompleted()
    return Disposables.create() 
}

observable.map { $0 + 2}.filter { $0 > 3 }.subscribe { (event) in
    ...
}

流编程底层实现的本质则是 闭包的延迟执行和强制执行。具体来说是基于一种称为 delay 的特殊形式,对于 delay <exp> 的求值不会对 <exp> 求值,而是返回一个称为 延时对象 的对象。它可以看做是对未来的某个时间求值 <exp> 的允诺。在各类编程语言中,返回特定类型的闭包常被用于描述一个延迟对象。与 delay 配对的是 force 的过程。它以一个延时对象为参数,执行相应的求值工作,即迫使 delay 完成其所允诺的求值。在各类编程语言中,执行返回特定类型的闭包常被用于描述 force 的过程。

函数式编程

RxSwift 提供了大量无副作用的操作符,无副作用也是函数式编程的一种重要特性。RxSwift 能够实现操作符的链式调用,一个重要的前提是:提供操作符的类型和操作符的返回类型必须保持一致。这里就涉及到了函数式编程中的一些高阶概念:函子(Functor)适用函子(Applicative)单子(Monad)。详细内容可参见《函数式编程——Functor、Applicative、Monad》一文。

不过,RxSwift 操作符中运用最多的是 函子(Functor),什么是函子?简而言之,函子能够将普通函数应用到一个包装类型。如下图及代码所示,一个包装类型包含了一个原始值,当函子作用在其上后,使用普通函数对原始值进行转换,最终将结果值放入包装类型中返回。

image
extension Result {
    // 满足 Functor 的条件:map 方法能够将 普通函数 应用到包装类
    func map<U>(_ f: (T) -> U) -> Result<U> {
        switch self {
        case .success(let x): return .success(f(x))
        case .failure: return .failure
        }
    }
}

以 RxSwift 中最常用的 map 操作符为例,如下所示。map 方法扩展自 ObservableType,其能够将普通函数 (Element) -> Result 应用到 ObservableType 包装类型。这其实就是一种典型的 函子 应用。

extension ObservableType {
    public func map<Result>(_ transform: @escaping (Element) throws -> Result)
        -> Observable<Result> {
        return Map(source: self.asObservable(), transform: transform)
    }
}

核心实现

下面我们将以正向设计的方式,结合 RxSwift 中的设计思想,手动实现一个 RxSwift 的核心部分。

基本功能(RxDemo-01)

首先,我们来定义事件,RxSwift 中有三种类型的事件,如下所示:

// MARK: - Event

enum Event<Element> {
    case next(Element)
    case error(Error)
    case completed
}

其次,我们来定义订阅者,在 RxDemo-01 中,我们先忽略 取消订阅 的功能,如下所示。订阅者必须遵循订阅者协议,需要实现 监听事件 的方法 on(event: Event<Element>)。订阅者内部维护一个处理事件的闭包 _handler。当监听事件方法触发时,会立即执行处理事件闭包。

// MARK: - Observer

protocol ObserverType {
    associatedtype Element
    
    // 监听事件
    func on(event: Event<Element>)
}

class Observer<Element>: ObserverType {
    
    // 处理事件的闭包
    private let _handler: (Event<Element>) -> Void
    
    init(handler: @escaping (Event<Element>) -> Void) {
        _handler = handler
    }
    
    // 实现 监听事件 的协议,内部处理事件
    func on(event: Event<Element>) {
        // 处理事件
        _handler(event)
    }
}

最后,我们来定义发布者,如下所示。发布者必须遵循发布者协议,需要实现 订阅操作 的方法 subscribe<O: ObserverType>(observer: O) where O.Element == Element。发布者内部维护一个发布事件的闭包 _eventGenerator。当订阅发生时(订阅操作方法被执行时),会立即执行发布事件的闭包。

// MARK: - Observable

protocol ObservableType {
    associatedtype Element
    
    // 订阅操作
    func subscribe<O: ObserverType>(observer: O) where O.Element == Element
}

class Observable<Element>: ObservableType {
    // 定义 发布事件 的闭包
    private let _eventGenerator: (Observer<Element>) -> Void
    
    init(eventGenerator: @escaping (Observer<Element>) -> Void) {
        _eventGenerator = eventGenerator
    }
    
    // 实现 订阅操作 的协议,内部发布事件
    func subscribe<O: ObserverType>(observer: O) where O.Element == Element {
        // 生成事件
        _eventGenerator(observer as! Observer<Element>)
    }
}

接下来,我们来试用一下 RxDemo-01 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式基本吻合。

// MARK: - Test

let observable = Observable<Int> { (observer) in
    print("send 0")
    observer.on(event: .next(0))
    print("send 1")
    observer.on(event: .next(1))
    print("send 2")
    observer.on(event: .next(2))
    print("send 3")
    observer.on(event: .next(3))
    print("send completed")
    observer.on(event: .completed)
}

let observer = Observer<Int> { (event) in
    switch event {
    case .next(let value):
        print("recive \(value)")
    case .error(let error):
        print("recive \(error)")
    case .completed:
        print("recive completed")
    }
}

observable.subscribe(observer: observer)
// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// send completed
// recive completed

下图所示为 RxDemo-01 实现的 RxSwift 的内部调用关系。通过闭包实现将 observer 传递给 observable,在发布事件时能够将事件传递给 observer,从而形成一条看似自左向右流动的数据流。

image

取消订阅(RxDemo-02)

RxDemo-01 有一个明显的缺陷——无法取消订阅。我们来参考 RxSwift 的实现,它 并不是直接让订阅者支持取消订阅,而是通过一个第三方类型 Disposable 对订阅进行管理Disposable 的核心作用是 提供一个状态位标识订阅是否已经取消

关于由第三方类来管理订阅,而不是让订阅者自己管理的原因,我猜测有两个:一是出于职责单一的原则;二是为了支持函数式编程,抽取一个第三方类型作为返回值,从而在链式调用时保持类型一致。

Disposable 协议要求所有的 Disposable 类型都实现 dispose 方法,表示取消订阅。

protocol Disposable {
    // 取消订阅
    func dispose()
}

这里我们定义两个遵循 Dispoable 协议的类型:AnonymousDisposableCompositeDisposable

AnonymousDisposable 作为一个匿名 Disposable,在本例中作为最底层的 Disposable 并没有什么作用,只是为了实现模式统一而已。

class AnonymousDisposable: Disposable {
    // AnonymousDisposable 封装了 取消订阅 的闭包
    private let _disposeHandler: () -> Void
    
    init(_ disposeClosure: @escaping () -> Void) {
        _disposeHandler = disposeClosure
    }
    
    func dispose() {
        _disposeHandler()
    }
}

CompositeDisposable 作为一个可管理多个 Disposable 的容器,它内部维持一个标志位表示订阅是否被取消。CompositeDisposable 所实现的 dispose 方法真正改变了标志位,并对其所维护的所有 Disposable 执行各自的 dispose 方法,从而完成它们定义的在取消订阅时需要执行的附带操作。

class CompositeDisposable: Disposable {
    // 可用于管理一组 Disposable 的 CompositeDisposable

    // 判断是否已销毁(取消订阅)的标志位
    private(set) var isDisposed: Bool = false
    // 管理一组 Disposable
    private var disposables: [Disposable] = []
    
    init() {}
    
    func add(disposable: Disposable) {
        if isDisposed {
            disposable.dispose()
            return
        }
        disposables.append(disposable)
    }
    
    func dispose() {
        guard !isDisposed else { return }
        // 销毁所有 disposable,并设置标志位
        disposables.forEach {
            $0.dispose()
        }
        isDisposed = true
    }
}

很显然,这里执行 dispose 操作只是修改了状态,并没有释放订阅资源。只有当 CompositeDisposable 对象被释放后才算真正释放资源。在原版 RxSwift 中,DisposeBag 差不多就是 CompositeDisposable。这样也是为什么我们要把订阅交给 DisposeBag 来进行管理,DisposeBag 作为某个对象的属性,会随着对象的释放,从而自动释放真正的订阅资源。

为了支持 Disposable,我们需要在 RxDemo-01 的基础上稍作修改即可,主要是修改发布者 Observable

// MARK: - Observable

protocol ObservableType {
    associatedtype Element
    
    // 订阅操作
    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element
}

class Observable<Element>: ObservableType {
    // 定义 发布事件 的闭包
    private let _eventGenerator: (Observer<Element>) -> Disposable
    
    init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
        _eventGenerator = eventGenerator
    }
    
    // 实现 订阅操作 的协议,内部生成事件
    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
        let composite = CompositeDisposable()
        // 通过一个中间 Observer 对原始 Observer 进行封装,用于过滤事件的传递。
        let disposable = _eventGenerator(Observer { (event) in
            guard !composite.isDisposed else { return }
            // 事件传递给原始 observer
            observer.on(event: event)
            // 通过 composite 管理 error、completed 时,自动取消订阅
            switch event {
            case .error(_), .completed:
                composite.dispose()
            default:
                break
            }
        })
        // 将 _eventGenerator 返回的 AnonymousDisposable 加入至 CompositeDisposable 中进行管理
        composite.add(disposable: disposable)
        return composite
    }
}

接下来,我们来试用一下 RxDemo-02 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式进一步吻合。这里,我们实现了取消订阅的功能。

// MARK: - Test

let observable = Observable<Int> { (observer) -> Disposable in
    print("send 0")
    observer.on(event: .next(0))    // observer.on(event: .next(0).map({ $0 * 2 }))
    print("send 1")
    observer.on(event: .next(1))
    print("send 2")
    observer.on(event: .next(2))
    print("send 3")
    observer.on(event: .next(3))
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        print("send completed")
        observer.on(event: .completed)
    }
    return AnonymousDisposable {
        print("dispose")
    }
}

let observer = Observer<Int> { (event) in
    switch event {
    case .next(let value):
        print("recive \(value)")
    case .error(let error):
        print("recive \(error)")
    case .completed:
        print("recive completed")
    }
}

let disposable = observable.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    disposable.dispose()
}

// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// dispose
// send completed

RxDemo-02 的核心思想是在 RxDemo-01 的基础上对事件进行拦截和过滤,如下图所示。

image

具体的实现方式如下所示,通过 CompositeDisposable 管理 AnonymousDisposable(原始 subscribe 中的闭包执行后的返回类型)。同时,在执行发布事件时,使用一个中间 Observer 接收原始事件,中间 Observer 引用外部 CompositeDisposable 的状态决定是否将事件发送给原始 Observer。

image

使用 CompositeDisposable 的本质就是添加了一个中间层来解决管理订阅的问题。

image

结构优化(RxDemo-03)

在 RxDemo-02 中,AnonymousObserver 引用了外部的 CompositeDisposable 中的订阅状态,从而决定事件的传递方向。这种代码逻辑由内而外实现,并不是很直观。

为了更加清晰地描述这个事件流动方向,RxDemo-03 通过增加一个中间层,将原始 observer、中间 observer、事件转发逻辑聚合在同一层级下,让代码具有更好的可读性。这里我们实现一个遵循 Disposable 协议的 Sink 类型。Sink水槽 的意思,象征着这里我们通过它来控制事件的流动方向,暗示了这个类的作用。

class Sink<O: ObserverType>: Disposable {
    private var _disposed: Bool = false
    private let _forward: O
    private let _eventGenerator: (Observer<O.Element>) -> Disposable
    private let _composite = CompositeDisposable()
    
    init(forward: O, eventGenerator: @escaping (Observer<O.Element>) -> Disposable) {
        _forward = forward
        _eventGenerator = eventGenerator
    }
    
    func run() {
        // 通过一个中间 Observer 接收原始事件
        // 根据 CompositionDisposable 的状态决定是否传递给原始 Observer
        let observer = Observer<O.Element>(forward)
        // 执行事件生成器
        // 将返回值 Disposable 加入到 CompositeDisposable 中进行管理
        _composite.add(disposable: _eventGenerator(observer))
    }
    
    private func forward(event: Event<O.Element>) {
        guard !_disposed else { return }
        // 事件传递给原始 observer
        _forward.on(event: event)
        // 通过 composite 管理 error、completed 时,自动取消订阅
        switch event {
        case .completed, .error(_):
            dispose()
        default:
            break
        }
    }
    
    func dispose() {
        _disposed = true
        _composite.dispose()
    }
}

在定义了 Sink 之后,我们就可以简化 Observablesubscribe 方法的具体实现。

class Observable<Element>: ObservableType {
    // 定义 发布事件 的闭包
    private let _eventGenerator: (Observer<Element>) -> Disposable
    
    init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
        _eventGenerator = eventGenerator
    }
    
    // 实现 订阅操作 的协议,内部生成事件
    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
        let sink = Sink(forward: observer, eventGenerator: _eventGenerator)
        sink.run()
        return sink
    }
}

SinkDisposable 引用了原始的事件发生器,并定义一个中间 Observer 转入至原始事件发生器,从而让中间 observer 接收原始事件。除此之外,SinkDisposable 还引用了原始 observer,当中间 observer 处理原始事件时,会判断订阅是否已经取消,从而决定是否将原始事件转发给原始 observer。此时,RxDemo-03 实现的 RxSwift 的内部调用关系如下所示。

image

操作符(RxDemo-04)

下面,我们来实现操作符,RxSwift 中包含了大量的操作符,它们基本上都是对函数式编程中 函子单子 等进行了应用。我们以 map 为例进行介绍。

如下所示,map 方法能够将一个普通函数应用到包装类型 ObservableType 上。map 方法最终返回一个 Observable<Result> 类型(同样遵循 ObservableType 协议),因此能够完美支持链式操作。

extension ObservableType {
    func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
        return Observable<Result> { (observer) in   // observer 为原始 observer
            // 此闭包可看成是一个 eventGenerator
            // 向原始 observable 中传入一个中间 map observer,即由中间 map observer 替换原始 observer 监听原始事件
            // 中间 map observer 对原始事件进行转换后,转发给原始 observer
            return self.subscribe(observer: Observer { (event) in
                switch event {
                case .next(let element):
                    do {
                        try observer.on(event: .next(transform(element)))
                    } catch {
                        observer.on(event: .error(error))
                    }
                case .error(let error):
                    observer.on(event: .error(error))
                case .completed:
                    observer.on(event: .completed)
                }
            })
        }
    }
}

接下来,我们来试用一下 RxDemo-04 所实现的 RxSwift。这里,我们实现了 map 操作符的功能。

// MARK: - Test

let observable = Observable<Int> { (observer) -> Disposable in
    print("send 0")
    observer.on(event: .next(0))    // observer.on(event: .next(0).map({ $0 * 2 }))
    print("send 1")
    observer.on(event: .next(1))
    print("send 2")
    observer.on(event: .next(2))
    print("send 3")
    observer.on(event: .next(3))
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        print("send completed")
        observer.on(event: .completed)
    }
    return AnonymousDisposable {
        print("dispose")
    }
}

let observer = Observer<Int> { (event) in
    switch event {
    case .next(let value):
        print("recive \(value)")
    case .error(let error):
        print("recive \(error)")
    case .completed:
        print("recive completed")
    }
}

let disposable = observable.map { $0 * 2 }.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    disposable.dispose()
}

// send 0
// recive 0
// send 1
// recive 2
// send 2
// recive 4
// send 3
// recive 6
// dispose
// send completed

RxDemo-04 中由于增加了操作符功能,其内部的调用关系也发生了变化。特别是,发布事件和过滤事件的逻辑,很显然,每增加一个操作符,就会增加一个 SinkDisposable 中间层,调用栈也会更深。

image

如果我们仔细分析 RxDemo-04,其实我们可以发现内部隐藏了如下所示的调用关系链:observable -> MapObserver -> MapObservable -> observer。实际执行时,调用关系如下所示:

  1. observable 调用 MapObserver.on 方法,将原始事件传递给 MapObserver;
  2. MapObserver 使用 map 方法将原始事件转换成 map 事件(即 map 后的数据),作为 MapObservable 发出的事件;
  3. MapObservable 调用 observer.on 方法,将 map 事件传给 observer。
image

分类细化(RxDemo-05)

在 RxDemo-04 中,为了增加 map 操作符,对 ObservableType 进行了扩展,其本质就是在原始的 Observable 和 Observer 之间插入了一个 MapObserver 和一个 MapObservable。本节,我们继续进行优化,对中间类也进行细分和定义。

在 RxDemo-04 中,Sink 的主要作用是 根据订阅是否取消决定是否拦截事件的传递。这里我们可能会想到:中间订阅者(如:MapObserver)本身是不是就应该具备 Sink 的这种功能呢?事实上,RxSwift 就是让 Sink 作为所有 Observer 的基类。

对于操作符,我们可以实现上述的模式;但是,对于不带操作符的情况,该如何处理呢?为了实现模式的统一,我们可以认为不带操作符的情况等同于带了 只返回原值的匿名操作符,即等同于 map { $0 }。针对此情况,我们也需要定义两个类 AnonymousObserverAnonymousObservable

下面,我们来进行优化改进。

首先,我们将原来的 Observable<Element> 改成抽象基类。

// 抽象类
class Observable<Element>: ObservableType {
//    // 定义 发布事件 的闭包,有子类来定义
//    private let _eventGenerator: (Observer<Element>) -> Disposable
//
//    init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
//        _eventGenerator = eventGenerator
//    }

    // 实现 订阅操作 的协议,内部生成事件
    func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
        rxAbstractMethod()
    }
}

func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
    fatalError("Abstract Method", file: file, line: line)
}

其次,我们再一定一个新的类 Producer<Element> 代替 Observable<Element>Producer<Element> 继承自 Observable<Element>,作为发布者的基类,该类内部没有时间生成器 eventGenerator 的闭包,由子类选择性进行定义。

class Producer<Element>: Observable<Element> {
    // 实现 订阅操作 的协议,内部生成事件
    override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
        return run(observer: observer)
    }
    
    func run<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
        rxAbstractMethod()
    }
}

然后,我们来对 Sink 进行修改。在 RxDemo-04 中,Sink 即提供了事件生成的功能和事件转发的功能。这里,我们让 Sink 的职责更加单一,仅仅是提供事件转发的功能。修改结果如下:

class Sink<O: ObserverType>: Disposable {
    private var _disposed: Bool = false
    private let _forward: O
    private let _composite = CompositeDisposable()
    
    init(forward: O) {
        _forward = forward
    }
    
    func forward(event: Event<O.Element>) {
        guard !_disposed else { return }
        // 事件传递给原始 observer
        _forward.on(event: event)
        // 通过 composite 管理 error、completed 时,自动取消订阅
        switch event {
        case .completed, .error(_):
            dispose()
        default:
            break
        }
    }
    
    func dispose() {
        _disposed = true
        print("dispose execute")
        _composite.dispose()
    }
}

为了便于泛型类型的转换,我们给 ObservableType 协议增加一个方法,并由 Observable<Element> 予以实现。

protocol ObservableType {
    // ...
    
    func asObservable() -> Observable<Element>
}
class Observable<Element>: ObservableType {
    // ...

    func asObservable() -> Observable<Element> {
        return self
    }
}

接下来,我们来分别实现 Sink 的子类 AnonymousObserverMapObserver 以及 Producer 的子类 AnonymousObservableMapObservable

class AnonymousObserver<O: ObserverType>: Sink<O>, ObserverType {
    typealias Element = O.Element
    
    override init(forward: O) {
        super.init(forward: forward)    // forward 为原始订阅者
    }
    
    func on(event: Event<Element>) {
        // 对原始事件进行转发
        switch event {
        case .next(let element):
            self.forward(event: .next(element))
        case .error(let error):
            self.forward(event: .error(error))
            self.dispose()
        case .completed:
            self.forward(event: .completed)
            self.dispose()
        }
    }
    
    func run(parent: AnonymousObservable<Element>) -> Disposable {
        // 执行事件生成器
        parent._eventGenerator(Observer(self))
    }
}

class AnonymousObservable<Element>: Producer<Element> {
    // 持有事件生成器闭包
    let _eventGenerator: (Observer<Element>) -> Disposable
    
    init(eventGenerator: @escaping (Observer<Element>) -> Disposable) {
        self._eventGenerator = eventGenerator
    }
    
    override func run<O>(observer: O) -> Disposable where Element == O.Element, O : ObserverType {
        // 订阅发生时,生成一个中间订阅者 AnonymousObserver 来订阅原始事件,并将事件转发给原始订阅者
        let sink =  AnonymousObserver(forward: observer)
        sink.run(parent: self)
        return sink
    }
}
class MapObserver<Source, Result, O: ObserverType>: Sink<O>, ObserverType {
    typealias Element = Source
    typealias Result = O.Element
    typealias Transform = (Source) throws -> Result
    private let _transform: Transform
    
    init(forward: O, transform: @escaping Transform) {  // forward 为原始订阅者
        self._transform = transform
        super.init(forward: forward)
    }
    
    func on(event: Event<Element>) {
        // 对原始事件进行 map 转换,对结果进行转发
        switch event {
        case .next(let element):
            do {
                let mappedElement = try _transform(element)
                self.forward(event: .next(mappedElement as! O.Element))
            } catch {
                self.forward(event: .error(error))
            }
        case .error(let error):
            self.forward(event: .error(error))
            self.dispose()
        case .completed:
            self.forward(event: .completed)
            self.dispose()
        }
    }
}

class MapObservable<Source, Result>: Producer<Result> {
    typealias Transform = (Source) throws -> Result
    private let _transform: Transform
    private let _source: Observable<Source>
    
    init(source: Observable<Source>, transform: @escaping Transform) {
        self._source = source
        self._transform = transform
    }

    override func run<O: ObserverType>(observer: O) -> Disposable where Result == O.Element {
        // 订阅发生时,生成一个中间订阅者 MapObserver 来订阅上游事件
        let sink = MapObserver(forward: observer, transform: self._transform)
        self._source.subscribe(observer: sink)
        return sink
    }
}

现在我们再来看发布者、操作符,其实两者的本质都是一样,都是创建了一个发布者。对此,我们采用扩展的方式来提供相应的方法。如下所示:

extension ObservableType {
    static func create(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(eventGenerator: eventGenerator)
    }
}

extension ObservableType {
    func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
        return MapObservable(source: self.asObservable(), transform: transform)
    }
}

最后,我们再来验证一下实现结果,如下所示。

// MARK: - Test

let observable = Observable<Int>.create { (observer) -> Disposable in  // observer 为 MapObserver
    print("send 0")
    observer.on(event: .next(0))
    print("send 1")
    observer.on(event: .next(1))
    print("send 2")
    observer.on(event: .next(2))
    print("send 3")
    observer.on(event: .next(3))
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        print("send completed")
        observer.on(event: .completed)
    }
    return AnonymousDisposable {
        print("dispose")
    }
}

let observer = Observer<Int> { (event) in
    switch event {
    case .next(let value):
        print("recive \(value)")
    case .error(let error):
        print("recive \(error)")
    case .completed:
        print("recive completed")
    }
}

let disposable = observable.map { $0 * 2 }.map { $0 + 1 }.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    disposable.dispose()
}

// send 0
// recive 1
// send 1
// recive 3
// send 2
// recive 5
// send 3
// recive 7
// dispose execute
// send completed

从运行结果而言,RxDemo-05 基本实现了分类细化,并且达到了取消订阅的功能。此时,上述例子中实际的订阅关系如下所示。相比 RxDemo-04,多了 AnonymousObserverAnonymousObservable,但是整体的内部订阅关系链更加清晰了。

image

此时,我们再来对照一下 RxSwift 和 RxDemo-05 中类的定义,如下表所示。各个类的功能基本相同,整体结构也是大同小异,只是在类名上略有差异。

RxDemo-05 Producer Sink AnonymousObserver AnonymousObservable MapObserver MapObservable
RxSwift Producer Sink AnonymousObservableSink AnonymousObservable Map MapSink

订阅管理(RxDemo-06)

如果,我们仔细对比,可以发现 RxSwift 中 Producersubscribe 方法内部与 RxDemo-05 还是不太一样,前者内部还引用了一个 SinkDisposer 类。其作用是什么呢?事实上,其主要作用是管理 disposeHandler。细心的同学可能会发现 RxDemo-05 中有个 BUG:取消订阅时没有执行 print("dispose") 闭包。

对此,我们也可以用类似的方式来解决,通过增加一个 Diposer 类来进行管理。具体代码见:RxDemo-06。

当订阅发生时(即执行 subscribe 方法时),内部会产生一个递归的控制流,如下图所示。

image

通过递归返回的方式构建整个订阅管理关系链,如下图所示。diposer0subscribe 方法最终返回的 Disposable 对象。当我们对 disposer0 执行 dispose 方法时,内部会递归地执行 dispose 方法,最终取消订阅链中所有的订阅。

注意:这里面会有循环引用,如 MapObserver 内部又引用了 Disposer0,RxDemo-06 以及 RxSwift 中的处理是给 Disposer 类内部添加一个状态,表示是否已经取消了订阅,从而避免循环引用导致的循环调用。这里的循环引用并不一定是坏事,它在下述的场景下起到了非常关键的作用。

image

当事件中出现一个 completeerror 事件时,由于事件会依次传递至 Observer,最后一次传递时,即 MapObserver 进行传递时,会判断是否是 completeerror 事件,从而决定是否执行 dispose 方法。当 MapObserver 执行 dipose 方法时,会通过上述的循环引用,调用 Disposer0 执行 dispose 方法,从而实现整体取消订阅。

总结

本文通过逐步实现 RxSwift 核心部分中的功能,一窥其背后的设计思路。从中我们也看到了其对函数式编程的应用,以及其所呈现出来的流编程模式的底层实现原理。

后续,我们将进一步探索原版 RxSwift 中其他的一些内容。

参考

  1. RxSwift repo
  2. ReactiveX
  3. 函数式编程——Functor、Applicative、Monad
  4. 《计算机程序的构造与解释》
  5. Modern RxSwift Architectures
  6. Learn Rx by implementing Observable
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容