map
操作符为每一个序列元素提供转换,并返回到原序列。
看一段代码示例:
Observable<Int>.of(1,2,3,4,5,6)
.subscribe(onNext: { (val) in
print(val)
}).disposed(by: disposeBag)
输出:1,2,3,4,5,6
Observable<Int>.of(1,2,3,4,5,6)
.map{
$0+10
}
.subscribe(onNext: { (val) in
print(val)
}).disposed(by: disposeBag)
输出:11,12,13,14,15,16
-
of
初始化序列,序列元素类型需保存一直 -
map
操作符,操作序列每个元素加10后作为新元素,构成新的序列
那么map
是如何给序列重新设置新值的呢?闭包就像加工零件的数控机床,设定好加工程序$0+10
就会对of
中的每一个元素加工产出新的零件,看一下map
源码都做了哪些事情:
extension ObservableType {
public func map<R>(_ transform: @escaping (E) throws -> R)
-> Observable<R> {
return self.asObservable().composeMap(transform)
}
}
-
transform
逃逸闭包,转换逻辑交给业务层 -
asObservable()
保证协议的一致性
首先看到map
函数是一个带闭包参数的ObservableType
的扩展函数,内部调用了composeMap
并传入了外部的闭包以便内部调用。
由前边的源码探索经验可猜测,该处闭包会被保留在内部,在订阅时被使用,那么根据断点一步步探索,看看外界的闭包最终会保留在何处。composeMap
所在类:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
// 此处代码有省略
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
-
source
向_map
函数传入了self
即为当前的序列对象 -
transform
一路追踪的外部闭包
是ObservableType
的子类Observable
实现了composeMap
方法,返回Observable
类型的对象,在内部调用了_map
方法:
internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {
return Map(source: source, transform: transform)
}
还是向Map
内部传入序列,及业务层闭包,一直强调序列和业务层闭包,主要由于结构复杂,以免被遗忘,后续和订阅难以被联系在一起。继续查看Map
类:
final private class Map<SourceType, ResultType>: Producer<ResultType> {
typealias Transform = (SourceType) throws -> ResultType
private let _source: Observable<SourceType>
private let _transform: Transform
init(source: Observable<SourceType>, transform: @escaping Transform) {
self._source = source
self._transform = transform
#if TRACE_RESOURCES
_ = increment(&_numberOfMapOperators)
#endif
}
override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
let originalSelector = self._transform
return Map<SourceType, R>(source: self._source, transform: { (s: SourceType) throws -> R in
let r: ResultType = try originalSelector(s)
return try selector(r)
})
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
- 继承自
Producer
,在《RxSwift核心源码探索》中我们已经很熟悉了,继承自Observable
,提供了连接序列和观察者的方法对象sink
,及发送序列元素到观察者,再返回到订阅,这里不再叙述。 -
Map
中保留了源序列及业务层闭包方法 - 此处
run
方法会在父类Producer
类中方法调用,父类指针指向子类对象
继续断点运行就到达了我们的订阅,该处方法和《RxSwift核心源码探索》中的订阅方法为同一方法:
extension ObservableType {
//业务层订阅调用
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
self.asObservable().subscribe(observer)
此处调用的则是Producer
中的subscribe
方法,看一下该处方法:
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
此方法很熟悉,主要看一下内部self.run
方法调用,此处继承链和《RxSwift核心源码探索》中的继承链不同,继承链如下:
-
RxSwift
核心源码探索中Producer
的子类是AnonymousObservable
,run
方法在此类实现 -
Map
源码中Producer的子类是Map
,run
方法在该处被实现
此处如果不太清楚可以追溯上文查看。上面有Map
类的完整代码,此处只查看调用方法代码:
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {
let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
- 调用了
MapSink
方法此处和《RxSwift核心源码探索》中的AnnonymousObservableSink
类似 -
self._source
此处为订阅时保存的闭包 -
.subscribe(sink)Producer
类的方法,传入sink
用来调用sink
中的on
方法
类似于《RxSwift核心源码探索》中的Sink
,功能是一样的,MapSink
中保留的是观察者,Map
中保留的为可观察序列Observable
,通过Observable
来触发观察者的方法调用。subscribe
方法中调用的
sinkAndSubscription = self.run(observer, cancel: disposer)
final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
fileprivate let _elements: S
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: S, scheduler: ImmediateSchedulerType) {
self._elements = elements
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
是继承自Producer
的方法,内部创建了ObservableSequenceSink
对象并传入了当前Observable
对象和observer
对象,最终调用了run()
方法,此处猜测内部为变量序列并调用观察者闭包方法,向外界发送消息。代码如下:
final private class ObservableSequenceSink<S: Sequence, O: ObserverType>: Sink<O> where S.Iterator.Element == O.E {
typealias Parent = ObservableSequence<S>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
- 注意此类继承自
Sink
,由此可知会调用Sink
中的forwardOn
方法
_elements
是由of
创建时保留的序列集合,此处对序列元素进行遍历,并调用forwardOn
方法发送元素。forwardOn
:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate var _disposed = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
init(observer: O, cancel: Cancelable) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(&self._disposed, 1) {
return
}
self._observer.on(event)
}
}
-
_observer
是上面传入的MapSink
对象
清楚看到在此处调用了sink
的on
方法,self._observer.on(event)
。继续追踪MapSink
类的on
方法:
final private class MapSink<SourceType, O: ObserverType>: Sink<O>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = O.E
typealias Element = SourceType
private let _transform: Transform
init(transform: @escaping Transform, observer: O, cancel: Cancelable) {
self._transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try self._transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}
到此处就很熟悉了,此处on
和《RxSwift核心源码探索》中不同:
-
《RxSwift核心源码探索》中此处有业务层
onNext
来触发 -
Map
中是通过设定好的of
序列直接触发
元素处理代码:
do {
let mappedElement = try self._transform(element)
self.forwardOn(.next(mappedElement))
}
-
let mappedElement = try self._transform(element)
调用外界闭包获取新值 -
self.forwardOn(.next(mappedElement))
通过forwardOn
将新值发送至订阅者
最终会调用ObserverBase中的on
方法,再调用观察者observer
的onCore
方法,向观察者发送元素。在由观察者调用业务层订阅时实现的闭包将序列元素发送到了业务层,到此map
就完成了对源序列的修改。
总结:
实际上map
就是对sink
做了一层封装,根据业务层的map
设置在ObservableSequenceSink
中处理了序列元素再发送至forwardOn
直至Observer
对象,由此完成了对元素的加工处理。
RxSwift
源码比较绕,复杂的逻辑带来的是高效的开发,高效的运行,因此对RxSwfit
源码我们还需要进一步探索理解。