Observable.create(
//原始被观察者
new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter)throws Exception {
emitter.onNext("xxx");
}
})
.subscribeOn(Schedulers.io())
.subscribe(
//最终观察者
new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1.每个切块都有自己的被观察者,观察者。并用source持有上游被观察者,在开始创建的时候传入。
2.遇见最终观察者调用subscribeActual()方法,创建自己的观察者并用observer持有下游观察者。调用source.subscribe(parent)方法。source时上游被观察。
3.遇到最开始的被观察者对象调用在subscribe()方法中调用parent.onNext()方法,向下游传递,在下游onNext()方法中执行完自己的需求后使用observer.onNext()继续传递。
设计模式:
1.观察者模式
2.使用自己的观察者包装下游观察者是装饰者模式
3.通过构造器传递上游被观察者,执行subscribeActual方法传递下游观察者,是模版设计模式