【领略RxSwift源码】- 订阅的工作流(Subscribing)

RxSwift Logo

开篇

一直觉得自己似乎越来越浮躁了,可能当代的大多数年轻人都活在恐慌里,问题已经从小时候的不思进取变成了“太思进取”,似乎总是想不管什么投入都能立竿见影。但是很多时候总是事与愿违,所以这次我想试着静下心来,读一读RxSwift的源码,希望不要Give up halfway吧。

关于RxSwift,作为ReactiveX的成员框架之一,它有着血脉相承的语法,语言和哲学上的一致性使得即便之后转向其他的平台我们也能很快的上手Rx。其次,通过阅读源码,我们也可以看到,其实有的时候大神的代码也没有那么的CleanPrefect ,我们也可以在一些地方看到妥协和失误。

本篇目标

本篇的目标就是了解下面这段代码(来自Rx.playgroud)的实现:

example("just") {
    let disposeBag = DisposeBag()
    
    Observable.just("🔴")
        .subscribe { event in
            print(event)
        }
        .disposed(by: disposeBag)
}

这寥寥数行代码,我想但凡RxSwift入门的同学都知道它的用处:创建一个单值的可观察序列,并且打印出它的所有序列。恩...没毛病,但是本篇想要知道的是:

  1. 序列是如何创建的?它的结构是怎么样的?
  2. 序列是如何被观察者订阅的?

so...let's go !

众所周知,自Swift诞生以来,苹果爸爸就一直在推崇面向协议编程(POP) ,而RxSwift也是同样的,遵循了从一个协议开始,而不是从一个类开始。但是我并不想从协议讲起,因为虽然从协议讲起最具逻辑性,但是从文章的角度来说并不好理解和阅读。所以本文将以示例代码为切入点,自上而下的阅读,以求简单清晰易懂。

0x01 - Observable

-> Observable

在开篇的示例代码中,首先映入我们眼帘的是ObservableObservable调用了just方法。其实Observable是一个遵守ObservableType的类,实现代码如下:


public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        let _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        let _ = Resources.decrementTotal()
#endif
    }

    // this is kind of ugly I know :(
    // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯

    /// Optimizations for map operator
    internal func composeMap<R>(_ selector: @escaping (Element) throws -> R) -> Observable<R> {
        return Map(source: self, transform: selector)
    }
}


首先这是一个用Public修饰的抽象类,它是直接面向RxSwift使用者的。在这个类当中,使用了E的别名来充当序列值的泛型类型。在Init方法中我们可以看到一个Resource的结构体,顺便提一句,这是一个用来“追踪计数”RxSwift引用资源的,每当init一个资源计数就+1,deinit的时候就总数-1,以此来追踪全局的资源使用。

除此之外,我们还可以看到有两个方法:

    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E 
    public func asObservable() -> Observable<E>

这两个方法都在ObservableType这个协议中有所定义,前者定义了Observable能够被订阅的行为,后者则是定义了可以将自身“转换”成“Observable实体”的功能。这里可能会有一些Confuse,其实前面已经提到过,单纯的Observable类并不能作为序列被直接订阅使用,只有Observable的实体子类才可以被实例化使用。

所以,我们也可以看到,subscribe函数的实现也只是简单的fatalError,并没有实际的逻辑操作:

/// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
    rxFatalError("Abstract method", file: file, line: line)
}

-> Observable - > ObservableType

现在我们再来看一下ObservableType:

public protocol ObservableType : ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype E
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}

extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<E> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}

这两个方法的作用前面已经讲过,在这里我们可以看到通过Protocol ExtensionRxSwift为ObservableType提供了默认的asObservable的实现,那么ObservableConvertibleType是一个什么协议呢,是不是有它从根源上定义了asObservable方法呢?我们来看一下ObservableConvertibleType的定义:

/// Type that can be converted to observable sequence (`Observer<E>`).
public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype E

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable() -> Observable<E>
}

果然如此,那么至此为止,Observable之前的协议继承体系我们已经明了,画成图大概是这样的:

Observable Protocol

很遗憾,至此为止我们并没有看到太多的实现逻辑,但是我们看到了一系列ObservableProtocol根基。那么具体的Observable实体应该是怎么样的呢?我们从just身上来找到答案。

Observable+Creation.swift文件中,我们找到了关于just的定义:

Observable+Creation.swift是一个Observable的一个拓展(extension),文件中我们可以看到很多关于构建Observable实体的方法,诸如create, empty, never等等,本篇以just作为切入点,其实其他的公开的Creation函数也是类似的逻辑,所以不会一一介绍了。

    public static func just(_ element: E) -> Observable<E> {
        return Just(element: element)
    }

我们可以看到,这是一个public修饰的暴露在外的Observable的静态方法,返回的也是Observable类型。那么这个Just是什么呢?

final class Just<Element> : Producer<Element> {
    private let _element: Element
    
    init(element: Element) {
        _element = element
    }
    
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(_element))
        observer.on(.completed)
        return Disposables.create()
    }
}

我们可以看到,这是一个继承自Producer的一个类,OK,我们先不去管这个Just,先去看看Producer是一个什么东西:


class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }
    
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            let disposer = SinkDisposer()
            let sinkAndSubscription = run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
    
    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

看到这里我们清楚了,Producer是继承自Observable的一个抽象类,结合前面的Just,于是我们的图可以画成这样了:

Paste_Image.png

Observable的子类Producer我们可以看到该基类实现subscribe的基础方法,这里用到了RxSwift当中的另外一个概念--Scheduler,但是它不是本文的重点,我们将在接下来的文章里面去集中讨论它,这里只是做一些简单的解读(感觉给自己埋了坑)。

Producer中,subscribe主要做了以下几件事情:

  1. 创建一个SinkDisposer
  2. 判断是否需要Scheduler来进行切换线程的调用,如果需要那么就在指定的线程中操作。
  3. 调用run方法,将observer和刚刚创建的SinkDisposer作为入参,得到一个SinkSubscription的一个元组。这里的SinkSubscription都是遵守Disposable的类。
    4.SinkDisposer对传入之前的SinkSubscription执行setSinkAndSubscription方法。
    5.将执行完setSinkAndSubscription方法的disposer作为返回值返回。

这里的相关操作其实都容易理解,首先看看这个run:

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }

好,既然这是一个抽象方法,那么我们暂且先不去管它,今晚不是它的轮次(最近🐺杀玩多了)。除去这个方法,最让人疑惑的就是这个setSinkAndSubscription方法,那么它的作用是什么呢?

我们先来谈一谈SinkDisposer类,但是再谈它之前我们需要先知道它所遵守的协议。SinkDisposer是一个遵守Cancelable协议的类,那么这个Cancelable是何方神圣呢?

这一切都要从Disposable说起。

0X02-Dispose

Disposable

Disposable只是一个简单的协议,其中只有一个dispose方法,定义了释放资源的统一行为。


/// Respresents a disposable resource.
public protocol Disposable {
    /// Dispose resource.
    func dispose()
}

OK,这个很简单Cancelable呢?

/// Represents disposable resource with state tracking.
public protocol Cancelable : Disposable {
    /// Was resource disposed.
    var isDisposed: Bool { get }
}

没错,Cancelable只是一个继承自Disposable的一个协议,其中定义了一个Bool类型的isDisposed标识,用来标识是否该序列已经被释放。

SinkDisposer

OK,现在我们终于来到了SinkDisposer类,先上源码:


fileprivate final class SinkDisposer: Cancelable {
    fileprivate enum DisposeState: UInt32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }

    // Jeej, swift API consistency rules
    fileprivate enum DisposeStateInt32: Int32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }
    
    private var _state: AtomicInt = 0
    private var _sink: Disposable? = nil
    private var _subscription: Disposable? = nil

    var isDisposed: Bool {
        return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
    }

    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        _sink = sink
        _subscription = subscription

        let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }
    }
    
    func dispose() {
        let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = _sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = _subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            _sink = nil
            _subscription = nil
        }
    }
}


在这里我们看到的一些以Atomic开头的方法都是在OSAtomic.h中所定义的自动读取和更新指定值方法,详细的使用方法可以点这里的官方文档。这里使用的Atomic方法是为了区分以下几种可能性:

case1 - 第一次执行

  1. sinksubscription赋值给自身的私有变量。
  2. 通过Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,将_state的值更新为2,并且返回值previousState为0。
  3. previousStateDisposeStateInt32. sinkAndSubscriptionSet.rawValue做逻辑运算,得值为0所以不执行if里面的逻辑。
  4. previousStateDisposeStateInt32.disposed.rawValue 做逻辑运算,的值为0,所以也不执行if里面的逻辑。
  5. 结束。

case2 - 再次执行

1.由于没有执行过dispose方法,所以自从第一次执行setSinkAndSubscription之后,_state的值一直为2。当执行previousStateDisposeStateInt32.disposed.rawValue的时候,的值为2,所以执行xFatalError("Sink and subscription were already set")程序中止运行。

case3 - 先执行过dispose,然后第一次执行

  1. 由于执行过dispose 方法,所以_state的值为1。
  2. 通过Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,将_state的值更新为2,并且返回值previousState为1。
  3. previousStateDisposeStateInt32. sinkAndSubscriptionSet.rawValue做逻辑运算,得值为0所以不执行if里面的逻辑。
  4. previousStateDisposeStateInt32.disposed.rawValue 做逻辑运算,的值为1,所以执行if内的操作,将sinksubscription分别执行dispose操作,并且将两个私有变量置nil,打破引用环。

我们可以看到,通过一个_stateOSAtomic的方法,RxSwift非常优雅的解决了上述三种场景,非常值得借鉴。而本类中的dispose方法其实也是类似的处理方法,来保证只有一次有效的dispose操作,本文就不再赘述。

0X03 - Observer

接下来我们来讲讲RxSwift中的另外一个角色,Observer(观察者),这次我们从观察者的基类ObserverBase谈起:

ObserverBase是一个遵守了DisposableObserverType协议的一个抽象类,实现了ondispose。值得注意的是,在ObserverBase中有一个私有变量:

    private var _isStopped: AtomicInt = 0

_isStopped是一个哨兵,用来标记所观察的序列是否已经停止了,那么什么时候需要标记为Stop呢?我们来看这段代码:

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if _isStopped == 0 {
                onCore(event)
            }
        case .error, .completed:
            if !AtomicCompareAndSwap(0, 1, &_isStopped) {
                return
            }
            onCore(event)
        }
    }

这段代码做的事情很简单:

  1. 只要_isStopped为0,那么就允许“发射”.next事件,也就是执行onCore方法。
  2. 当第一次“发射”.error或者.completed时,执行一次onCore,并且将_isStopped设为1。

因为所有的Observer类在事件发射的逻辑上面都相同,所以统一在ObserverBase中作了处理,这也是典型的OOP思想。老铁,没毛病~

值得一提的是,我们可以看到这里使用了一个AtomicCompareAndSwap的方法,这个方法是做什么的呢?在Platform.Darwin.swift中,我们可以看到关于这个方法的定义:


    typealias AtomicInt = Int32

    let AtomicCompareAndSwap = OSAtomicCompareAndSwap32Barrier
    let AtomicIncrement = OSAtomicIncrement32Barrier
    let AtomicDecrement = OSAtomicDecrement32Barrier

我们可以看到,AtomicCompareAndSwap其实就是OSAtomic库中所定义的一个全局方法:

/*! @abstract Compare and swap for 32-bit values with barrier.
    @discussion
    This function compares the value in <code>__oldValue</code> to the value
    in the memory location referenced by <code>__theValue</code>.  If the values
    match, this function stores the value from <code>__newValue</code> into
    that memory location atomically.

    This function is equivalent to {@link OSAtomicCompareAndSwap32}
    except that it also introduces a barrier.
    @result Returns TRUE on a match, FALSE otherwise.
 */
@available(iOS 2.0, *)
@available(iOS, deprecated: 10.0, message: "Use atomic_compare_exchange_strong() from <stdatomic.h> instead")
public func OSAtomicCompareAndSwap32Barrier(_ __oldValue: Int32, _ __newValue: Int32, _ __theValue: UnsafeMutablePointer<Int32>!) -> Bool

简单的来说,该方法传入三个参数:__oldValue__newValue__theValue,前两个参数都是Int32类型的,后一个是UnsafeMutablePointer<Int32>的可变指针。当__oldValue的值和指针所指向的内存地址的变量的值相等时,返回true否则为false,于此同时,如果__newValue和当前的值不相等,那么就赋值,使得__theValue的值为新值。只要__oldValue与指针所指向的值相同,那么就会对该指针进行一次赋值操作(此处感谢jmstack的指正)。

伪代码如下:

f (*pointer == oldvalue) {  
    *pointer = newvalue;  
    return 1;  
} else {  
    return 0;  
}  

番外 - Memery barrier

为了达到最佳性能,编译器通常会对汇编基本的指令进行重新排序来尽可能保持处理器的指令流水线。作为优化的一部分,编译器有可能对访问主内存的指令,如果它认为这有可能产生不正确的数据时,将会对指令进行重新排序。不幸的是,靠编译器检测到所有可能内存依赖的操作几乎总是不太可能的。如果看似独立的变量实际上是相互影响,那么编译器优化有可能把这些变量更新位错误的顺序,导致潜在不不正确结果。

内存屏障(memory barrier)是一个使用来确保内存操作按照正确的顺序工作的非阻塞的同步工具。内存屏障的作用就像一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作,才允许它执行位于屏障之后的加载和存储操作。内存屏障同样使用来确保一个线程(但对另外一个线程可见)的内存操作总是按照预定的顺序完成。如果在这些地方缺少内存屏障有可能让其他线程看到看似不可能的结果。为了使用一个内存屏障,你只要在你代码里面需要的地方简单的调用OSMemoryBarrier函数。

匿名观察者

看完了ObserverBase现在我们来看一下AnonymousObserver:

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        let _ = Resources.incrementTotal()
#endif
        _eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return _eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        let _ = Resources.decrementTotal()
    }
#endif
}

我们可以看到,在这个匿名观察者中,它主要做的事情就是将基类ObserverBase所没有实现的onCore方法实现了,将观察者构造方法时传入的EventHandleronCore方法中执行。这也就是观察者受到序列事件的动作。

订阅过程

在我们对ObservableObserverDisposeable有了一定的认知之后,我们可以来认识一下最为关键的一步,subscribe也就是订阅。

ObservableType+Extensions.swift中我们可以看到相关的实现:


    /**
    Subscribes an event handler to an observable sequence.

    - parameter on: Action to invoke for each event in the observable sequence.
    - returns: Subscription object used to unsubscribe from the observable sequence.
    */
    public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
        let observer = AnonymousObserver { e in
            on(e)
        }
        return self.subscribeSafe(observer)
    }

所谓的subscribe其实只是做了两个事情。首先是构造了一个匿名观察者,将on也就是(Event<E>) -> Void类型的闭包作为参数,每次在匿名观察者有新的事件的时候调用,这里也用到了尾随闭包的语法糖,提高阅读性。其次,将刚刚构造的匿名观察者,通过subscribeSafe函数来完成订阅。那么subscribeSafe究竟做了一些什么事情呢?

extension ObservableType {
    func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        return self.asObservable().subscribe(observer)
    }
}

subscribeSafe是一个内部的方法,所有内部的订阅操作全部通过该方法来完成,一般最后都是通过subscribe方法的多态性来完成最终的订阅,那么回想一下之前�Justsubscribe方法我们就可以知道,一旦调用subscribe方法,Just立刻给匿名观察者发送一个包裹了初始值的.next事件和一个.completed事件,最后返回一个NopDisposable类型的“存根”,NopDisposable是一个在执行dispose操作时不进行任何操作的存根。然后整个订阅过程就结束了。

DisposeBag

对于开头的代码,我们现在唯一还没有讲到的就是addDisposableTo这个方法,我们都知道,当一个序列执行subscribe之后我们会得到一个遵守Disposable的存根。那么根据方法名,我们也可以猜到这个一个将存根添加到一个地方的方法,那么它是要将存根添加到哪里呢?

没错!就是我们天天在写的DisposeBag。在DisposeBag.swift中我们可以找到该方法的定义:

extension Disposable {
    /// Adds `self` to `bag`.
    ///
    /// - parameter bag: `DisposeBag` to add `self` to.
    public func addDisposableTo(_ bag: DisposeBag) {
        bag.insert(self)
    }
}

那么DisposeBag到底是个什么东西呢?talk is cheap,我们直接来看源码:

public final class DisposeBag: DisposeBase {
    
    private var _lock = SpinLock()
    
    // state
    private var _disposables = [Disposable]()
    private var _isDisposed = false
    
    /// Constructs new empty dispose bag.
    public override init() {
        super.init()
    }
    
    /// Adds `disposable` to be disposed when dispose bag is being deinited.
    ///
    /// - parameter disposable: Disposable to add.
    public func insert(_ disposable: Disposable) {
        _insert(disposable)?.dispose()
    }
    
    private func _insert(_ disposable: Disposable) -> Disposable? {
        _lock.lock(); defer { _lock.unlock() }
        if _isDisposed {
            return disposable
        }

        _disposables.append(disposable)

        return nil
    }

    /// This is internal on purpose, take a look at `CompositeDisposable` instead.
    private func dispose() {
        let oldDisposables = _dispose()

        for disposable in oldDisposables {
            disposable.dispose()
        }
    }

    private func _dispose() -> [Disposable] {
        _lock.lock(); defer { _lock.unlock() }

        let disposables = _disposables
        
        _disposables.removeAll(keepingCapacity: false)
        _isDisposed = true
        
        return disposables
    }
    
    deinit {
        dispose()
    }
}

其实DisposeBag这个类设计的还是非常的简单明了的,暴露给外部的只有一个insert方法,将需要被管理的Dispose交给这个Bag,当该Bag执行deinit方法的时候执行dispose,将所持有的所有Disposable遍历一遍,同时挨个dispose,值得注意的是,该类内部使用了一个锁:

    private var _lock = SpinLock()

这个SpinLock其实就是一个NSRecursiveLock的递归锁,该🔐的作用就是为了保证_disposables的数组线程安全,之所以用递归锁是因为有可能会出现在相同的线程多次调用insert的而引发死锁。

正常情况下,执行insert方法,首先会执行加锁操作,然后Bag会将该Disposable加入到_disposables这个数组中,最后解锁。但是还有一种情况,那就是当执行insert操作的时候,该Bag已经被析构了,那么我们就不需要再将其加入数组,直接将该Disposable释放掉就可以了。

QA

分析了上述的源码之后,我想开篇的两个问题我们也可以大致回答了:

1.序列是如何创建的?它的结构是怎么样的?

其实无论是通过just方法构建的序列还是createempty或者of方法构建的序列,最终得到的都是一个Producer的子类,只是不同的方法所构建的序列行为不一样,所以要通过子类的方法重写来实现多态,这是典型的OOP思想这里也就不多解释了。至于它的结构也是取决该序列的特性,比如本文中的Just,由于它是一个单一序列,只会有第一次的初始值,所以在Just类中直接定义了一个私有的存储类型的变量来存储初始值,当完成订阅操作的时候,直接将该值通过.next事件发送出去,然后再将.completed事件发送,完成整个序列的生命周期。再比如通过never构建的EmptyProducer类,由于该序列需要做的只是永远不发送.next事件,所以EmptyProducer没有任何私有变量必要,他要做的只是在完成订阅的时候发送一个.completed事件。由于Producer的子类太多了,篇幅有限就不在这里一一列举。

2.序列是如何被观察者订阅的?

当一个序列构造完毕的之后,调用subscribe方法会进行SubscribeHandler,也就是进行订阅的相关操作。具体来说,对于Just这个序列,SubscribeHandler指的是就是发送一个.next(element)事件和一个.completed事件;对于NeverProducer这个序列,SubscribeHandler指的是单单只发送一个.completed事件;所以对于不同的SubscribeHandler会有不同的订阅操作,总的来说是根据序列的特性来发送给观察者不同的事件流。

值得一提的是在RxSwift中还有一个很重要的概念Sink,关于它的解释可以参考一下这个issueSink相当与一个加工者,可以将源事件流转换成一个新的事件流,如果讲事件流比作水流,事件的传递过程比作水管,那么Sink就相当于水管中的一个转换头。关于Sink我们会在之后的文章中详细的讲述。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • 当程序员原来越浮躁了,项目做多了大都是雷同的, 对技术没啥帮助,读一些牛逼的第三方框架,有助于提升,关于RxSwi...
    水落斜阳阅读 756评论 0 1
  • 本文章内部分图片资源来自RayWenderlich.com 本文结合自己的理解来总结介绍一下RxSwift最基本的...
    FKSky阅读 2,866评论 4 14
  • 最近在学习RxSwift相关的内容,在这里记录一些基本的知识点,以便今后查阅。 Observable 在RxSwi...
    L_Zephyr阅读 1,746评论 1 4
  • 开篇 在上一篇中,我们分析了在RxSwift中的整个订阅流程。在开讲变换操作之前,首先要弄清楚Sink的概念,不清...
    Maru阅读 1,852评论 3 9
  • 加油,尽可能一定要完成任务啊!!! 本周自我工作学习安排: 总体任务: 周二,四,六晚上8点半到11点《动脑学院测...
    郑钱钱的小笔记阅读 359评论 4 1