Map运算如下图所示:
代码如下:
extension ObservableType {
public func map<R>(selector: E throws->R)->Observable<R> {
return self.asObservable().composeMap(selector)
}
public func mapWithIndex<R>(selector: (E, Int) throws->R)->Observable<R> {
return MapWithIndex(source: asObservable(), selector:selector)
}
}
public class Observable<Element>: ObservableType {
public typealias E = Element
init() {
}
public func subscribe<O: ObserverType where O.E == E>(observer: O)->Disposable {
abstractMethod()
}
public func asObservable()->Observable<E> {
return self
}
deinit {
}
internal func composeMap<R>(selector: Element throws->R_->Observable<R> {
return Map(source: self, selector:selector)
}
}
class Map<SourceType, ResultType>: Producer<ResultType> {
typealias Selector = (SourceType) throws->ResultType
private let _source: Observable<SourceType>
private let _selector: Selector
init(source:Observable<SourceType>, selector: Selector) {
_source = source
_selector = selector
}
override func composeMap<R>(selector: ResultType throws ->R)->Observable<R> {
let originalSelector = _selector
return Map<SourceType, R>(source: _source, selector: { (s: SourceType) throws->R in
let r: ResultType = try originalSelector(s)
return try selector(r)
})
}
override func run<O: ObserverType where O.E == ResultType>(observer: O)->Disposable {
let sink = MapSink(selector: _selector, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
deinit {
}
}
class MapSink<SourceType, O: Observable>: Sink<O>, ObserverType {
typealias Selector = (SourceType) throws -> ResultType
typealias ResultType = O.E
typealias Element = SourceType
private let _selector: Selector
init(selector: Selector, observer: O) {
_selector = selector
super.init(oberver: oberver)
}
func on(event: Event<SourceType>) {
switch event {
case .Next(let element):
do {
let mappedElement = try _selector(element)
forwardOn(.Next(mappedElement))
}catch let e {
forwardOn(.Error(e))
dispose()
}
case .Error(let error):
forwardOn(.Error(error))
dispose()
case .Completed:
forwardOn(.Completed)
dispose()
}
}
}
class MapWithIndex<SourceType, ResultType> : Producer<Result> {
typealias Selector = (SourceType, Int) throws->ResultType
private let _source: Observable<SourceType>
private let _selector: Selector
init(source: Observable<SourceType>, selector: Selector) {
_source = source
_selector = selector
}
override func run<O: ObserverType where O.E == ResultType>(observer: O)->Disposable {
let sink = MapWithIndexSink(selector: _selector, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
class MapWithIndexSink<SourceType, O: ObserverType>: Sink<O>, ObserverType {
typealias Selector = (SourceType, Int) throws->ResultType
typealias ResultType = O.E
typealias Element = SourceType
typealias Parent = MapWithIndex<SourceType, ResultType>
private let _selector: Selector
private var _index = 0
init(selector: Selector, observer: O) {
_selector = selector
super.init(observer: observer)
}
func on(event: Event<SourceType>) {
switch event {
case .Next(let element):
do {
let mappedElement = try _selector(element, try incrementChecked(&_index))
forwardOn(.Next(mappedElement))
}
catch let e {
forwardOn(.Error(e))
dispose()
}
case .Error(let error):
forwardOn(.Error(error))
dispose()
case .Completed:
forwardOn(.Completed)
dispose()
}
}
}