前言
这里的分类指的是按照实现原理分类,而不是按照功能进行分类,针对每一个分类选择一个具体类型,进行分析
Override subscribe operator
- 操作符:
never
,empty
,just
,error
- 特点:
- 通过简单重载
subscribe
方法达到目的, - 没有
Sink
这个概念,因为太简单了,都不需要Sink
- 通过简单重载
- 示例:
// class Just
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
// 无需经过Sink,直接发出信号
observer.on(.next(_element))
observer.on(.completed)
return Disposables.create()
}
Recursive scheduling operator
- 操作符:
of
,from
,range
,repeatElement
,generate
- 特点:
- 输入源都是一个
Sequence
, 所以其核心是一个迭代器 - 通过递归调度输出元素
- 输入源都是一个
- 示例:
// ObservableSequenceSink Class
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
print("scheduleRecursive: \(next)")
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
scheduleRecursive
: 线程递归调度方法
let next = mutableIterator.0.next()
: 迭代器迭代元素
create
这个之前详细讲过,创建一个AnonymousObservable
作为承载闭包的实体
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
do
这个很重要,重点讲一下
// simple Example
Observable.of("🍎", "🍐", "🍊", "🍋")
.do(onNext: { print("Intercepted:", $0) }, onError: { print("Intercepted error:", $0) }, onCompleted: { print("Completed") })
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
do
这个操作符是专门用来处理副作用的,什么是副作用呢,打个简单的比方实现: 1 + 2 + 3 + 4 + 5, 我要在+3的时候,做一个额外的操作,我要改变一下ui的背景色,但是这一步对最终的结果没有任何影响,就可以使用do
操作符,这么做的好处能够提高代码可读性同时也良好体现了函数的单一性职责。好了现在看看实现原理。
// Do Class
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
_onSubscribe?()
let sink = DoSink(eventHandler: _eventHandler, observer: observer, cancel: cancel)
let subscription = _source.subscribe(sink)
_onSubscribed?()
let onDispose = _onDispose
let allSubscriptions = Disposables.create {
subscription.dispose()
onDispose?()
}
return (sink: sink, subscription: allSubscriptions)
}
这里的_source
即 original Observable,而这里的sink
指的就是DoSink
// DoSink Class
func on(_ event: Event<Element>) {
do {
try _eventHandler(event)
forwardOn(event)
if event.isStopEvent {
dispose()
}
}
catch let error {
forwardOn(.error(error))
dispose()
}
}
DoSink
会在执行on
事件的时候执行_eventHandler
,也就是最开始用户传进来的那个闭包。
Rx最核心的是什么就是响应式的编程, A -> B -> C -> D -> E,A事件的发生最终导致E事件的发生。那么本来相互孤立的事件如何建立联系呢? 答案就在 let subscription = _source.subscribe(sink)
这一句, 通过subscribe
将本来孤立的事情紧密的联系在一起,并且Rx隐藏了所以的细节,用户无需为此做大量的额外操作就能获得该功能。 每一个Observer 都不需要了解具体有多少个Observable,它只需要上个Observable是谁就可以了。整个事件看起来是这样:E subscribe D subscribe C subscribe B subscribe A,但是细分一点其实是这样:
E subscribe D
D subscribe C
C subscribe B
B subscribe A
不管整个事件流到底有多长,其核心构建就是 B和A,用算法归纳就是如下:
var e
while(e.hasObservable) {
e.observer(e.Observable)
e = e.Observable
}
这个很像单车的链条,不管链条多长,其最小的组成单元都是一个小链,有头尾两端,可以与其他的链在一起。
Deferred
推迟执行,它只在被订阅的时候才去创建Observable
,而create
是在一开始就创建Observable
,Deferred
每次被订阅都会创建一个新的Observable
,而create
被多次订阅都是同一个Observable
。
let disposeBag = DisposeBag()
var count = 1
let deferredSequence = Observable<String>.deferred {
print("Creating \(count)")
count += 1
return Observable.create { observer in
print("Emitting...")
observer.onNext("🐶")
observer.onNext("🐱")
observer.onNext("🐵")
return Disposables.create()
}
}
deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
deferredSequence
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// output log
Creating 1
Emitting...
🐶
🐱
🐵
Creating 2
Emitting...
🐶
🐱
🐵
由于每次创建都是通过闭包创建新的Observable
,而闭包捕获的值count
在更新,所以两次运行结果不一样
实现原理
// DeferredSink Class
func run() -> Disposable {
do {
let result = try _observableFactory()
return result.subscribe(self)
}
catch let e {
forwardOn(.error(e))
dispose()
return Disposables.create()
}
}
这里可以看到每次订阅的时候都会通过_observableFactory
产生新的Observable