rxswift

Observable

rxswift 核心就是围绕着Observable 一系列的创建,发送,变换,组合,销毁等的操作

创建
let sub = Observable<Int>.create(observer,Disposable) {

    obser.onNext(0)
    obser.onNext(1)
    obser.onCompleted()

    return Disposables.create()
}
Observable
  • Observable 继承 ObservableType 协议,

  • // class Observable
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
  • ObservableType 继承 ObservableConvertibleType, ObservableType 协议约定了两个方法,subscribe asObservable subscribe是实现接受events的方法,asObservable是 转换 ObservableType to Observable

  • /// Represents a push style sequence.
    public protocol ObservableType : ObservableConvertibleType {
        /// Type of elements in sequence.
        associatedtype E
        func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
    }
    
    extension ObservableType {
        
        /// Default implementation of converting `ObservableType` to `Observable`.
        public func asObservable() -> Observable<E> {
            // temporary workaround
            //return Observable.create(subscribe: self.subscribe)
            return Observable.create { o in
                return self.subscribe(o)
            }
        }
    }
    
  • ObservableConvertibleType 有个关联的范型值E,返回Observable本身的asObservable 函数。

  • public protocol ObservableConvertibleType {
        /// Type of elements in sequence.
        associatedtype E
    
        /// Converts `self` to `Observable` sequence.
        func asObservable() -> Observable<E>
    }
    
  • ObserverType 协议 提供了关联范性值E,约定了on,onNext,onCompleted,onError 方法,提供了迭代对event的发送处理。

    /// Supports push-style iteration over an observable sequence.
    public protocol ObserverType {
        associatedtype E
        func on(_ event: Event<E>)
    }
    extension ObservableType {
        
        /// 所有内部订阅调用
           func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
            return self.asObservable().subscribe(observer)
        }
        
        /// 事件处理操作
        public func subscribe(_ on: @escaping (Event<E>) -> Void) -> Disposable {
            
            let observer = AnonymousObserver { e in
                // 事件传递                             
                on(e)
            }
            return self.subscribeSafe(observer)
        }
    }
    
    
    /// Convenience API extensions to provide alternate next, error, completed events
    extension ObserverType {
    
        public final func onNext(_ element: E) {
            on(.next(element))
        }
    
        public final func onCompleted() {
            on(.completed)
        }
    
        public final func onError(_ error: Swift.Error) {
            on(.error(error))
        }
    }
    
Create
  • create 是对Observable的扩展,返回AnonymousObservable ,AnonymousObservable继承Producer ,Producer继承Observable,Producer主要是重写subscribe 函数,负责run执行分发事件和通过SinkDisposer 销毁事件
  • SubscribeHandler 是一个执行 传入AnyObserver ,返回Disposable 的闭包,在create时候,初始化。
//  Creates an observable
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E>{
        return AnonymousObservable(subscribe) 
}

// AnonymousObservable run
final fileprivate class AnonymousObservable<Element> : Producer<Element> {
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

// AnonymousObservableSink run
final fileprivate class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {

    // typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    // _subscribeHandler = SubscribeHandler
    typealias Parent = AnonymousObservable<E>
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

AnyObserver

public struct AnyObserver<Element> : ObserverType {
    
    /// Construct an instance
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    /// Observer that receives sequence events.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    
    /// Send `event` to this observer.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

}
Subscribe
  • 创建Observable后,通过subscribe订阅后,回根据Producer 来进行信号的转发,包括线程的调度,销毁的时机
  • subscribe后通过AnonymousObserver创建EventHandler: (Event<Element>) -> Void 闭包后通过onCore传递触发event,AnonymousObserver继承ObserverBase ,回初始化 _isStopped 的值为0,如果是.error, .completed ,通过线程安全的OSAtomic 判断是否执行onCore,
// Producer
class Producer<Element> : Observable<Element> {
    
    // 订阅
   override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {

        // The returned disposable needs to release all references once it was disposed.
        let disposer = SinkDisposer()
        let sinkAndSubscription = run(observer, cancel: disposer)
        disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

        return disposer
    }  
}


Dispose
// SinkDisposer 
fileprivate final class SinkDisposer: Cancelable {
    // 销毁
    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        
        let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }
    }
}

// Sink
class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate var _disposed: Bool

    init(observer: O, cancel: Cancelable) {

        _observer = observer
        _cancel = cancel
        _disposed = false
    }
    
    final func forwardOn(_ event: Event<O.E>) {
    
        if _disposed {
            return
        }
        _observer.on(event)
    }
    
    final var disposed: Bool {
        return _disposed
    }

    func dispose() {
        _disposed = true
        _cancel.dispose()
    }
}

event

event是范型枚举类型,提供 next,error,completed 三种枚举值,也提供了对事件的状态的获取。isStopEvent element error isCompleted 对信号的状态状态 ,map 是对信号的transform操作

public enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}

extension Event {
    /// Is `completed` or `error` event.
    public var isStopEvent: Bool {
        switch self {
        case .next: return false
        case .error, .completed: return true
        }
    }
    /// If `completed` event, returns true.
    public var isCompleted: Bool {
        if case .completed = self {
            return true
        }
        return false
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容