本文主要尝试阐述
- 什么是 FRP
- RxSwift的几个基础概念
- RxSwift的最简单使用流程
- RxSwift是怎么通过基本流程来实现事件响应的
FRP
FRP全称为 Functional Reactive Programming, 即函数响应式编程。
函数式编程
强调每个程序都能够被反复分解为越来越小的模块单元,而所有这些块可以通过函数装配起来,以定义一个完整的程序。
响应式编程
在响应式编程当中,a:=b+c声明的是一种绑定关系。(a与b、c绑定起来了,所以b、c的变化会影响a,这也就是所谓【变化传播】)
RxSwift 的 基础概念
可观察序列 Observable
Observable 是RxSwift 框架的核心概念,它是RxSwift的数据源
请求接口的网络数据,可以定义成一个Observable a
请求数据库的数据,也可以定义成一个Observable b
而我们的界面显示的数据 c 可以绑定到 a + b的数据流上,即 c := a + b
我们可以想象到 c 也是一个 Observable ,
而 + 和 := 操作 在RxSwift中定义了相应的函数式
在RxSwift中只要实现了 ObservableType协议的类都可以认为是 Observable,
ObservableType的定义如下
public protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable
where Observer.Element == Element
}
从协议的定义可以认为,要成为一个可观察序列,只要实现subscribe方法即可,
而subscribe方法有一个Observer类型的参数,并且Observer需要实现ObserverType协议,下面我们会介绍观察者Observer
观察者 Observer
观察者,见名知意我们可以认为,它有一个被观察者,即Observable,通过普通的观察者模式,我们应该知道,一般观察者会定义一些行为来相应Observable的变化。我们来看看观察者的定义
public protocol ObserverType {
/// The type of elements in sequence that observer can observe.
associatedtype Element
@available(*, deprecated, message: "Use `Element` instead.")
typealias E = Element
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<Element>)
}
与Observable类似,要成为一个观察者,只要实现ObserverType协议即可。
以上介绍了RxSwift的两个基础概念, Observable和Observer
那么Rxswift是怎么通过这两个对象来处理业务的呢?
RxSwift的基本使用流程
// 1. 创建序列
let observable = Observable<Int>.create { (observer: AnyObserver<Int>) -> Disposable in
}
// 2. 订阅序列
let disposable = observable.subscribe(onNext: { (value) in
})
// 3. 资源回收
disposable.disposed(by: disposeBag)
RxSwift 处理事务的流程如上
- 创建一个可观察序列
- 通过观察者订阅可观察序列
- 将订阅的资源放入资源回收包中统一回收
举个例子,使用RxSwift来处理button的点击事件
button.rx.tap.subscribe(onNext: {
}).disposed(by: disposeBag)
为了验证流程, 我们需要查看button.rx.tap是否创建了一个Observable,来带 UIControl + Rx.swift文件,查看到源码如下
public func controlEvent(_ controlEvents: UIControl.Event) -> ControlEvent<()> {
let source: Observable<Void> = Observable.create { [weak control = self.base] observer in
MainScheduler.ensureRunningOnMainThread()
guard let control = control else {
observer.on(.completed)
return Disposables.create()
}
let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
observer.on(.next(()))
}
return Disposables.create(with: controlTarget.dispose)
}
.takeUntil(deallocated)
// ControlEvent其实也是一个特殊的可观察序列
return ControlEvent(events: source)
}
即button的点击事件处理的第一步,确实创建了一个Observable。同时我们要对点击事件处理的代码,也确实是写在观察者订阅序列的时候,并且也必须得通过disposed(by: disposeBag)方法回收资源。
RxSwift是怎么通过基本流程来实现事件响应的
首先我们来分析,在UIKit中点击事件的响应流程,即target-action模式
- 定义一个响应函数
- 通过 button.addTarget将响应函数作为button的事件处理函数
然后我们注意到,在rxswift的button.rx.tap的实现中,Observable的创建过程中,有下面这段代码块:
let controlTarget = ControlTarget(control: control, controlEvents: controlEvents) { _ in
observer.on(.next(()))
}
在ControlTarget类文件中我们会看到UIKit的target-action模式,并且可以知道当点击事件发生时就会执行ControlTarget初始化时传进去的闭包,即执行 observer.on(.next(()))
通过以上分析我们知道,当点击事件发生时就会执行 observer.on(.next(()))
,但是我们处理事件的业务却是写在Observable对象的subscribe
函数的参数闭包下面。
因此我们分析这两个闭包是否有关系呢?
或者我们猜测他们肯定有关系,才能保证流程是正确的。
同时我们会有一个疑问,observer是在哪里创建并且传入到Observable中去的呢?
分析RxSwift的源码,可以发现
extension ObservableType {
// MARK: create
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
- Observable create方法会创建一个内部私有类AnonymousObservable的对象
- AnonymousObservable继承自Producer类,而Producer实现了ObservableType协议要求的
subscribe
方法,subscribe方法会接收到一个观察者observer。 - 通过subscribe的参数,可以知道我们使用的订阅函数和Producer的subscribe并不是同一个函数
然后分析我们观察者订阅可观察序列时调用的subscribe函数
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
// 这里创建了一个observer
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
发现,原来观察者是通过订阅的时候创建的,并且在return语句时通过self.asObservable().subscribe(observer)传给Observable,这个self应该是AnonymousObservable的对象,这个subscribe即为Producer类定义的subscribe方法。
至此,我们可以确定,observer.on(.next(()))
方法确实是调用的订阅时subscribe的onNext闭包。
总结
Observer: onNext()
Observable -> AnonymousObservable -> Producer : subscribe(observer)
ObservableType: subscribe(onNext: ()->()), create() -> AnyAnonymousObservable
- Observable.create 可以拿到一个可观察序列,并且,可以通过闭包参数observer来发送数据
- Observable.create 创建的Observable其实是AnonymousObservable类型的(Observable的创建方法还有其他的,它们其实是生成了不同的ObservableType的实现类),创建的各种Observable其实都是继承自Producer
- 观察者订阅可观察序列,其实是ObservableType协议的默认实现,并且我们只要提供处理数据的闭包就行,observer在ObservableType默认实现的subscribe方法中就自动创建,并且会将我们提供的闭包给到创建的observer
- ObservableType协议的默认实现会将创建的observer传递给自己的实现类的对象
- Observable一旦发送数据,observer就会执行我们subscribe时提供的闭包