Observable
rxswift 核心就是围绕着Observable 一系列的创建,发送,变换,组合,销毁等的操作
创建
let sub = Observable<Int>.create(observer,Disposable) {
obser.onNext(0)
obser.onNext(1)
obser.onCompleted()
return Disposables.create()
}
Observable
Observable 继承 ObservableType 协议,
// class Observable public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E { rxAbstractMethod() } public func asObservable() -> Observable<E> { return self }
ObservableType 继承 ObservableConvertibleType, ObservableType 协议约定了两个方法,
subscribe
asObservable
subscribe是实现接受events的方法,asObservable是 转换ObservableType
toObservable
/// Represents a push style sequence. public protocol ObservableType : ObservableConvertibleType { /// Type of elements in sequence. associatedtype E func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E } extension ObservableType { /// Default implementation of converting `ObservableType` to `Observable`. public func asObservable() -> Observable<E> { // temporary workaround //return Observable.create(subscribe: self.subscribe) return Observable.create { o in return self.subscribe(o) } } }
ObservableConvertibleType 有个关联的范型值E,返回Observable本身的asObservable 函数。
public protocol ObservableConvertibleType { /// Type of elements in sequence. associatedtype E /// Converts `self` to `Observable` sequence. func asObservable() -> Observable<E> }
-
ObserverType 协议 提供了关联范性值E,约定了on,onNext,onCompleted,onError 方法,提供了迭代对event的发送处理。
/// Supports push-style iteration over an observable sequence. public protocol ObserverType { associatedtype E func on(_ event: Event<E>) } extension ObservableType { /// 所有内部订阅调用 func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E { return self.asObservable().subscribe(observer) } /// 事件处理操作 public func subscribe(_ on: @escaping (Event<E>) -> Void) -> Disposable { let observer = AnonymousObserver { e in // 事件传递 on(e) } return self.subscribeSafe(observer) } } /// Convenience API extensions to provide alternate next, error, completed events extension ObserverType { public final func onNext(_ element: E) { on(.next(element)) } public final func onCompleted() { on(.completed) } public final func onError(_ error: Swift.Error) { on(.error(error)) } }
Create
- create 是对Observable的扩展,返回AnonymousObservable ,AnonymousObservable继承Producer ,Producer继承Observable,Producer主要是重写subscribe 函数,负责run执行分发事件和通过SinkDisposer 销毁事件
- SubscribeHandler 是一个执行 传入AnyObserver ,返回Disposable 的闭包,在create时候,初始化。
// Creates an observable
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E>{
return AnonymousObservable(subscribe)
}
// AnonymousObservable run
final fileprivate class AnonymousObservable<Element> : Producer<Element> {
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
// AnonymousObservableSink run
final fileprivate class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
// typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
// _subscribeHandler = SubscribeHandler
typealias Parent = AnonymousObservable<E>
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
AnyObserver
public struct AnyObserver<Element> : ObserverType {
/// Construct an instance
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
/// Observer that receives sequence events.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
/// Send `event` to this observer.
public func on(_ event: Event<Element>) {
return self.observer(event)
}
}
Subscribe
- 创建Observable后,通过subscribe订阅后,回根据Producer 来进行信号的转发,包括线程的调度,销毁的时机
- subscribe后通过AnonymousObserver创建EventHandler:
(Event<Element>) -> Void
闭包后通过onCore传递触发event,AnonymousObserver继承ObserverBase ,回初始化 _isStopped 的值为0,如果是.error, .completed ,通过线程安全的OSAtomic 判断是否执行onCore,
// Producer
class Producer<Element> : Observable<Element> {
// 订阅
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
Dispose
// SinkDisposer
fileprivate final class SinkDisposer: Cancelable {
// 销毁
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
_sink = nil
_subscription = nil
}
}
}
// Sink
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate var _disposed: Bool
init(observer: O, cancel: Cancelable) {
_observer = observer
_cancel = cancel
_disposed = false
}
final func forwardOn(_ event: Event<O.E>) {
if _disposed {
return
}
_observer.on(event)
}
final var disposed: Bool {
return _disposed
}
func dispose() {
_disposed = true
_cancel.dispose()
}
}
event
event是范型枚举类型,提供 next,error,completed 三种枚举值,也提供了对事件的状态的获取。isStopEvent
element
error
isCompleted
对信号的状态状态 ,map
是对信号的transform操作
public enum Event<Element> {
case next(Element)
case error(Swift.Error)
case completed
}
extension Event {
/// Is `completed` or `error` event.
public var isStopEvent: Bool {
switch self {
case .next: return false
case .error, .completed: return true
}
}
/// If `completed` event, returns true.
public var isCompleted: Bool {
if case .completed = self {
return true
}
return false
}
}