在RxSwift文档中,介绍了它的常用框架,现在就来看看其中之一RxFeedback,至于用法与优势可以查看文档中的介绍,现在通过源码来分析其中状态与事件如何联系起来的。在这个框架中主要成分有State、Event、Mutation,然后通过reduce(加工程序)和feedback(连接管理)把它们串联起来形成一个反馈系统。一个反馈就是:事件(Event)发出指令(Mutation),通过加工程序处理,更新状态(State),反馈到状态的订阅者。事件序列的创建和状态的订阅在feedback中完成。为了方便,在下文中,面对于反馈系统进行的输入和反馈处理的载体称为客户环境。
先来看看系统创建的主要源码:
public static func system<State, Mutation>(
initialState: State,
reduce: @escaping (State, Mutation) -> State,
scheduler: ImmediateSchedulerType,
scheduledFeedback: [Feedback<State, Mutation>]
) -> Observable<State> {
return Observable<State>.deferred {
let replaySubject = ReplaySubject<State>.create(bufferSize: 1)
let asyncScheduler = scheduler.async
let mutations: Observable<Mutation> = Observable.merge(scheduledFeedback.map { feedback in
let state = ObservableSchedulerContext(source: replaySubject.asObservable(), scheduler: asyncScheduler)
return feedback(state)
})
// This is protection from accidental ignoring of scheduler so
// reentracy errors can be avoided
.observeOn(CurrentThreadScheduler.instance)
return mutations.scan(initialState, accumulator: reduce)
.do(onNext: { output in
replaySubject.onNext(output)
}, onSubscribed: {
replaySubject.onNext(initialState)
})
.subscribeOn(scheduler)
.startWith(initialState)
.observeOn(scheduler)
}
}
第一步,是创建State的可观察序列replaySubject,其后通过ObservableSchedulerContext进行包装为state。注意state的subscribe方法中是直接调用的自身source的订阅方法return self.source.subscribe(observer),所以后续订阅state对象就是直接订阅replaySubject。
第二步, feedback(state),在这个方法中处理了两件事情,一个是通过这个方法传入state让其能够在客户环境中被订阅,二是在客户环境中创建包含指定指令的事件序列mutations(在下文中用EM指代)。
第三步,建立EM与replaySubject和reduce的联系,mutations.scan(initialState, accumulator: reduce),首先使用提供的reduce加工处理状态State,然后在EM的Observer中将处理后的State传递到replaySubject在客户环境的订阅者replaySubject.onNext(output)。
再来看看feedback的原型Feedback,它的定义为public typealias Feedback<State, Mutation> = (ObservableSchedulerContext<State>) -> Observable<Mutation>,输入为ObservableSchedulerContext<State>(replaySubject),输出为Observable<Mutation>(EM)的函数。在Feedbacks.swift文件中提供了多种创建方法,现在参考文档中Counter示例,分析其中的一种。示例主体代码为:
typealias State = Int
enum Mutation {
case increment
case decrement
}
Observable.system(
initialState: 0,
reduce: { (state, mutation) -> State in
switch mutation {
case .increment:
return state + 1
case .decrement:
return state - 1
}
},
scheduler: MainScheduler.instance,
scheduledFeedback:
// UI is user feedback
bind(self) { me, state -> Bindings<Mutation> in
let subscriptions = [
state.map(String.init).bind(to: me.label!.rx.text)
]
let mutations = [
me.plus!.rx.tap.map { Mutation.increment },
me.minus!.rx.tap.map { Mutation.decrement }
]
return Bindings(subscriptions: subscriptions,
mutations: mutations)
}
)
.subscribe()
.disposed(by: disposeBag)
其中bind(self) {...调用的方法为:
public func bind<State, Mutation, WeakOwner>(_ owner: WeakOwner, _ bindings: @escaping (WeakOwner, ObservableSchedulerContext<State>) -> (Bindings<Mutation>))
-> (ObservableSchedulerContext<State>) -> Observable<Mutation> where WeakOwner: AnyObject {
return bind(bindingsStrongify(owner, bindings))
}
客户环境传入的闭包为bindings,是描述为 (WeakOwner, ObservableSchedulerContext<State>) -> (Bindings<Mutation>)的闭包,后面统称为闭包A。
然后通过下面的代码bindingsStrongify(owner, bindings)处理返回一个(O) -> (Bindings<Mutation>)的闭包,后面统称为闭包B,这里的O为ObservableSchedulerContext<State>,相比于闭包A去除了owner。
private func bindingsStrongify<Mutation, O, WeakOwner>(_ owner: WeakOwner, _ bindings: @escaping (WeakOwner, O) -> (Bindings<Mutation>))
-> (O) -> (Bindings<Mutation>) where WeakOwner: AnyObject {
return { [weak owner] state -> Bindings<Mutation> in
guard let strongOwner = owner else {
return Bindings(subscriptions: [], mutations: [Observable<Mutation>]())
}
return bindings(strongOwner, state)
}
}
最后通过下面代码,返回一个闭包(ObservableSchedulerContext<State>) -> Observable<Mutation>则是符合Feedback的闭包,称为闭包C,作为输入传入到Observable.system方法。
public func bind<State, Mutation>(_ bindings: @escaping (ObservableSchedulerContext<State>) -> (Bindings<Mutation>)) -> (ObservableSchedulerContext<State>) -> Observable<Mutation> {
return { (state: ObservableSchedulerContext<State>) -> Observable<Mutation> in
return Observable<Mutation>.using({ () -> Bindings<Mutation> in
return bindings(state)
}, observableFactory: { (bindings: Bindings<Mutation>) -> Observable<Mutation> in
return Observable<Mutation>
.merge(bindings.mutations)
.concat(Observable.never())
.enqueue(state.scheduler)
})
}
}
调用过程,在系统生成的步骤二中,调用feedback(state),将前面的replaySubject传入到闭包C, 通过闭包C 中的bindings(state)将replaySubject传入到闭包B,闭包B将指定的的owner和replaySubject通过bindings(strongOwner, state)传入到客户环境的闭包A中。这就是State关联序列的传入到客户环境的过程。在闭包A中客户环境订阅State序列replaySubject接收反馈。再来分析EM的创建和订阅,通过客户环境的闭包A的执行,创建了相关于Mutation的事件序列组,作为返回对象Bindings的成员,在闭包C中的Observable<Mutation>.using方法,将resourceFactory创建的Bindings对象(调用bindings(state)生成的)作为输入传入到observableFactory,observableFactory进行Mutation关联的事件序列组的merge处理Observable<Mutation>.merge(bindings.mutations)生成EM1,EM1通过using处理生成EM2,EM2返回到系统生成中,通过再次与其他feedback(state)生成的EMn进行merge处理生成了最终的EM,然后系统进入步骤三完成最后的关联。
另一些小积累:
Observable<Mutation>.using 的 resourceFactory输出作为observableFactory的输入在源码Using.swift
let resource = try _parent._resourceFactory()
disposable = resource
let source = try _parent._observableFactory(resource)
,另外source.subscribe(self)将自己作为source的订阅者,分发source的事件。所以,using的订阅者获取的事件都是来自source。对于using中source序列完成后,摧毁resource可通过下面的代码可见:
···
return Disposables.create(
source.subscribe(self),
disposable
)
···
case .completed:
forwardOn(.completed)
dispose()
,对于在RxFeedback中Binding对象中replaySubject订阅返回的对象的摧毁,是由于返回的Binding对象是支持Disposable协议的,在它的实现方法中,对订阅返回对象进行摧毁,如下
public func dispose() {
for subscription in subscriptions {
subscription.dispose()
}
}
,整个调用过程为:using中的resource发出完成命令,调用了dispose()进行释放,释放对象的disposable为Binding对象(disposable = resource),Binding对象调用dispose()方法,逐个调用订阅返回对象的摧毁方法。则也可以看出using如何摧毁resource的。(这里的resource为Binding对象)
Scan新的State状态的保存过程,源码在Scan.swift,注意_accumulate前面的&
try _parent._accumulator(&_accumulate, element)
forwardOn(.next(_accumulate))