RxJava
rxjava里最重要一条脉络,如map和线程池都是这样的
他们会有个类包裹上一层和转换(map,线程),包裹类会持有上一层被观察者当有订阅的时候的时候。
会调用最底层被观察者(大概率是包裹)的subcribeActual,并传入观察者,包括会拿出上一层被观察者source,用来与观察者建立联系,其实包裹执行完subcribeActual 吧上一层被观察者与观察者进行捆绑以后,感觉就没什么用了
包裹1,ObservableMap
public void subscribeActual(Observer<? super U> t) {
//上一层source和 t 建立联系
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}
包裹2 ObservableSubscribeOn,线程切换
```
public void subscribeActual(Observer<? super T> observer) {
ObservableSubscribeOn.SubscribeOnObserver<T> parent = new ObservableSubscribeOn.SubscribeOnObserver(observer);
//这个永远在当前线程进行
observer.onSubscribe(parent);
//将source和观察者绑定关系 设置runable的run方法里,通过线程池执行,这样,调用source的subcribeActure就是子线程里,最后到ObservableCreate的subcribeActure,里面全部都在所定义的线程里
parent.setDisposable(this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final ObservableSubscribeOn.SubscribeOnObserver<T> parent;
SubscribeTask(ObservableSubscribeOn.SubscribeOnObserver<T> parent) {
this.parent = parent;
}
public void run() {
//在这里建立的绑定关系
ObservableSubscribeOn.this.source.subscribe(this.parent);
}
}
## 包裹3
```
protected void subscribeActual(Observer<? super T> observer) {
if (this.scheduler instanceof TrampolineScheduler) {
this.source.subscribe(observer);
} else {
Worker w = this.scheduler.createWorker();
this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize));
}
}
```
看看ObserveOnObserver
```
public void onNext(T t) {
if (!this.done) {
if (this.sourceMode != 2) {
this.queue.offer(t);
}
this.schedule();
}
}
void schedule() {
if (this.getAndIncrement() == 0) {
this.worker.schedule(this);
}
}
```
## 思考
如果先定义 observeOn(AndroidSchedulers.mainThread()) 中间执行一个map转换
再调用.subscribeOn(Schedulers.io()),那map转换所发生的线程是什么
subscribeOn 是将你的建立绑定关系放在对应线程 ,observeOn 是将onNext方法放在对应线程
那肯定是 observeOn 的切换在后,所以应该再主线程
再来一个问题,如果是 subscribeOn(1). subscribeOn(2). subscribeOn(3),那么被观察者发射数据是会在1线程里,因为subscribeActual建立订阅关系从下往上
observeOn(1). observeOn(2) . observeOn(3)
观察者收到数据会在线程3里,因onNext是从上往下