RxSwift-map源码解析

map操作符为每一个序列元素提供转换,并返回到原序列。

map.png

看一段代码示例:

Observable<Int>.of(1,2,3,4,5,6)
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)

输出:1,2,3,4,5,6

Observable<Int>.of(1,2,3,4,5,6)
    .map{
        $0+10
    }
    .subscribe(onNext: { (val) in
        print(val)
    }).disposed(by: disposeBag)

输出:11,12,13,14,15,16

  • of初始化序列,序列元素类型需保存一直
  • map操作符,操作序列每个元素加10后作为新元素,构成新的序列

那么map是如何给序列重新设置新值的呢?闭包就像加工零件的数控机床,设定好加工程序$0+10就会对of中的每一个元素加工产出新的零件,看一下map源码都做了哪些事情:

extension ObservableType {
    public func map<R>(_ transform: @escaping (E) throws -> R)
        -> Observable<R> {
        return self.asObservable().composeMap(transform)
    }
}
  • transform逃逸闭包,转换逻辑交给业务层
  • asObservable()保证协议的一致性

首先看到map函数是一个带闭包参数的ObservableType的扩展函数,内部调用了composeMap并传入了外部的闭包以便内部调用。

由前边的源码探索经验可猜测,该处闭包会被保留在内部,在订阅时被使用,那么根据断点一步步探索,看看外界的闭包最终会保留在何处。composeMap所在类:

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
          // 此处代码有省略
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}
  • source_map函数传入了self即为当前的序列对象
  • transform一路追踪的外部闭包

ObservableType的子类Observable实现了composeMap方法,返回Observable类型的对象,在内部调用了_map方法:

internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
    return Map(source: source, transform: transform)
}

还是向Map内部传入序列,及业务层闭包,一直强调序列和业务层闭包,主要由于结构复杂,以免被遗忘,后续和订阅难以被联系在一起。继续查看Map类:

final private class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType

    private let _source: Observable<SourceType>

    private let _transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self._source = source
        self._transform = transform

#if TRACE_RESOURCES
        _ = increment(&_numberOfMapOperators)
#endif
    }

    override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
        let originalSelector = self._transform
        return Map<SourceType, R>(source: self._source, transform: { (s: SourceType) throws -> R in
            let r: ResultType = try originalSelector(s)
            return try selector(r)
        })
    }

    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
        let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
        let subscription = self._source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}
  • 继承自Producer,在《RxSwift核心源码探索》中我们已经很熟悉了,继承自Observable,提供了连接序列和观察者的方法对象sink,及发送序列元素到观察者,再返回到订阅,这里不再叙述。
  • Map中保留了源序列及业务层闭包方法
  • 此处run方法会在父类Producer类中方法调用,父类指针指向子类对象

继续断点运行就到达了我们的订阅,该处方法和《RxSwift核心源码探索》中的订阅方法为同一方法:

extension ObservableType {
    //业务层订阅调用
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

self.asObservable().subscribe(observer)此处调用的则是Producer中的subscribe方法,看一下该处方法:

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
}

此方法很熟悉,主要看一下内部self.run方法调用,此处继承链和《RxSwift核心源码探索》中的继承链不同,继承链如下:

  • RxSwift核心源码探索中Producer的子类是AnonymousObservablerun方法在此类实现
  • Map源码中Producer的子类是Maprun方法在该处被实现

此处如果不太清楚可以追溯上文查看。上面有Map类的完整代码,此处只查看调用方法代码:

override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
    let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
    let subscription = self._source.subscribe(sink)
    return (sink: sink, subscription: subscription)
}
  • 调用了MapSink方法此处和《RxSwift核心源码探索》中的AnnonymousObservableSink类似
  • self._source此处为订阅时保存的闭包
  • .subscribe(sink)Producer类的方法,传入sink用来调用sink中的on方法

类似于《RxSwift核心源码探索》中的Sink,功能是一样的,MapSink中保留的是观察者,Map中保留的为可观察序列Observable,通过Observable来触发观察者的方法调用。subscribe方法中调用的

  • sinkAndSubscription = self.run(observer, cancel: disposer)
final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
    fileprivate let _elements: S
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: S, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

是继承自Producer的方法,内部创建了ObservableSequenceSink对象并传入了当前Observable对象和observer对象,最终调用了run()方法,此处猜测内部为变量序列并调用观察者闭包方法,向外界发送消息。代码如下:

final private class ObservableSequenceSink<S: Sequence, O: ObserverType>: Sink<O> where S.Iterator.Element == O.E {
    typealias Parent = ObservableSequence<S>

    private let _parent: Parent

    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}
  • 注意此类继承自Sink,由此可知会调用Sink中的forwardOn方法

_elements是由of创建时保留的序列集合,此处对序列元素进行遍历,并调用forwardOn方法发送元素。forwardOn

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate var _disposed = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(&self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }
}
  • _observer是上面传入的MapSink对象

清楚看到在此处调用了sinkon方法,self._observer.on(event)。继续追踪MapSink类的on方法:

final private class MapSink<SourceType, O: ObserverType>: Sink<O>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = O.E
    typealias Element = SourceType

    private let _transform: Transform

    init(transform: @escaping Transform, observer: O, cancel: Cancelable) {
        self._transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self._transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

到此处就很熟悉了,此处on《RxSwift核心源码探索》中不同:

元素处理代码:

do {
    let mappedElement = try self._transform(element)
    self.forwardOn(.next(mappedElement))
}
  • let mappedElement = try self._transform(element)调用外界闭包获取新值
  • self.forwardOn(.next(mappedElement))通过forwardOn将新值发送至订阅者

最终会调用ObserverBase中的on方法,再调用观察者observeronCore方法,向观察者发送元素。在由观察者调用业务层订阅时实现的闭包将序列元素发送到了业务层,到此map就完成了对源序列的修改。

总结:

实际上map就是对sink做了一层封装,根据业务层的map设置在ObservableSequenceSink中处理了序列元素再发送至forwardOn直至Observer对象,由此完成了对元素的加工处理。

RxSwift源码比较绕,复杂的逻辑带来的是高效的开发,高效的运行,因此对RxSwfit源码我们还需要进一步探索理解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容