</br>
</br>
2016/07/26
上海
Read the fucking source code。
</br>
RxJava , Reactive Extensions for the JVM 。
Github:https://github.com/ReactiveX/RxJava
</br>
Observable--------------------------生产者ob
Observable.onSubscribe---------------生产者的任务表onSub
Subscriber--------------------------消费者sub
Observable.subscribe(Subscriber);------订阅
</br>
1,最基本的流程
(以下省略适配、优化等细节代码)
observable Aob = observable.create(
new onSubscribe(){
@Override
public void call(Subscriber<? super String> subscriber) {
.......
}
}
);
//Aob创建完毕,持有一个onSubscribe对象,此时是“冷”的,只是一个observable实例,
Aob.subscribe(outSubscriber){
...........
outSubscriber.onStart();
onSubscribe.call(outSubscriber);
........
};
//调用Aob.subscribe(outSubscriber);方法,打开observable的调用链,
//内部调用的是Aob.onSubscribe.call(outSubscriber):
//将outSubscriber传入onSubscribe.Call(outSubscriber),相当于启动onSubscribe,onSubscribe将数据传递给outSubscriber。
容我画个草图:
(1)observable Aob = observable.create(AonSub);
———————————
| Aob |
| 持有 |
| AonSub |
———————————
(2)Aob.subscribe(outSubscriber)
—————————————————————————————————
| Aob |
| 调用 |
| AonSub.Call(outSubscriber) |
—————————————————————————————————
(3)AonSub.Call(outSubscriber)
———————————————————————————————————————
| result为处理结果 |
| outSubscriber.onNext(result) |
| outSubscriber.onCompleted() |
| outSubscriber.onError(Throwable e) |
———————————————————————————————————————
.
</br>
2,map、flatmap、lift的调用流程
(以下省略适配、优化等细节代码)
//创建observable
observable Aob = observable......
//调用Aob.flatMap
Aob.flatMap(
new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
.....
}
}
).subscribe(outSubscriber);//绑定
使用上,调用flatMap操作符后返回observable,紧接着绑定Subscriber,flatMap返回的observable还是Aob?一开始我是这么认为的。
//flatMap()的返回Observable
public final Observable flatMap(Func1 func) {
....
//merge(Observable)操作符将多个Observable合为一,并异步、无序的发射Observable生产的数据,先不管它。
//merge参数为Observable,那map(func)肯定返回Observable。
return merge(map(func));
}
//map()也返回Observable
public final Observable map(Func1 func) {
//越调越远,lift()同样返回一个Observable,OperatorMap是?
return lift(new OperatorMap<T, R>(func));
}
到这先停一下,看看flatMap流程(先忽略merge,涉及到flatMap特性):
flatMap(func)-->map(func)-->lift(OperatorMap(func));
如果用map()操作符,那流程就是:
map(func)-->lift(OperatorMap(func));
核心就在lift与参数OperatorMap对象中。
//OperatorMap实现了Operator,Operator是?
class OperatorMap<T, R> implements Operator<R, T>;
//Operator继承自Func1,并设定Func1的Call方法参数为Subscriber,返回值也是Subscriber
interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>>
OperatorMap实现了Operator,只重写一个Func1.Call方法.
先大概看一下OperatorMap是什么鬼
public final class OperatorMap<T, R> implements Operator<R, T> {
//在调用flatMap传入的Func1是用来表示某种规则和逻辑,用来过滤或“先”处理,最后会传到这里
final Func1<? super T, ? extends R> transformer;
//构造方法传入Func1,就是flatMap传入的Func1
public OperatorMap(Func1<? super T, ? extends R> transformer) {
//赋值成员变量
this.transformer = transformer;
}
@Override
//实现自Operator,Operator又继承自Func1,call方法参数为Subscriber,返回Subscriber。
public Subscriber<? super T> call(final Subscriber<? super R> o) {
//new 一个Subscriber并返回
return new Subscriber<T>(o) {
....
};
}
}
到这,知道了OperatorMap实现了Operator,重写Func1.Call方法,并持有了flatMap(Func1)方法中传入的Func1。至于call方法中传入的Subscriber与返回的Subscriber干嘛用的,不急,先看lift(Operator)。
//参数Operator,map()中传入的是OperatorMap,返回Observable
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//直接new Observable并返回,这个构建方法是protected的
return new Observable<R>(
//Observable少不了OnSub
new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
......
}
});
}
依照之前的调用流程,一路飙到lift(Operator)。
flatMap(func)-->map(func)-->lift(OperatorMap(func));
lift(Operator)返回了一个new Observable,则返回流程是:
return new Observable-->map(func)-->flatMap(func)-->前台;
所以:
Aob.flatMap(...)
.subscribe(outSubscriber);
等价于
Observable Bob = Aob.flatMap(...);
Bob.subscribe(outSubscriber);
在最基本的运行流程中,ob.subscribe(outSubscriber)会激活OnSubscriber,将outSubscriber传入OnSubscriber的call方法中,并经处理后的数据通过outSubscriber.onNext(result)等方法通知前台。
下面我用Aob与Bob解释lift(),看看lift()返回的Observable.onSubscriber.call都做了什么
//只留核心代码
//参数Operator,map()中传入OperatorMap,返回Observable
public Observable lift(operator) {
//直接new Observable并返回,这个构建方法是protected的
return new Observable<R>(
//Observable少不了OnSub
new OnSubscribe<R>() {
//参数 o 就是调用Bob.subscribe(outSubscriber)传入的outSubscriber
public void call(Subscriber<? super R> o) {
.......
//hook.onLift(operator)方法直接将operator返回,真的就一行代码。
//operator就是map()中传入的OperatorMap。
//OperatorMap.call()方法传入一个Subscriber,返回一个Subscriber;
Subscriber<? super T> st = hook.onLift(operator).call(o);
//将OperatorMap.call()返回的Subscriber传给Aob.onSubscribe.call();
onSubscribe.call(st);
.........
}
});
}
1,Bob.subscribe(outSubscriber);
|
2,Bob.OnSubscribe.call(outSubscriber);
|
3,将outSubscriber传给OperatorMap.call(outSubscriber),返回另一个Subscriber,交给Aob.onSubscriber.call()处理。
|
4,没了
WTF?没outSubscriber什么事了?Aob.onSubscriber处理完数据都交给另一个Subscriber处理了,outSubscriber干嘛去了?
回头看看OperatorMap.call()方法
public final class OperatorMap<T, R> implements Operator<R, T> {
...........
@Override
//outSubscriber传给OperatorMap.call(outSubscriber),参数o就是outSubscriber
public Subscriber<? super T> call(final Subscriber<? super R> o) {
//new 一个Subscriber并返回
return new Subscriber<T>(o) {
//调用outSubscriber.onCompleted();
@Overridepublic void onCompleted(){o.onCompleted();}
//调用outSubscriber.onError(e);
@Overridepublic void onError(Throwable e){o.onError(e);}
@Overridepublic void onNext(T t) {
try {
//transformer就是.flatMap时传入的Func1
//t是Aob.onSubScriber.call()中返回的结果
//将t传给Func1.call(t);处理
//并把结果传给outSubscriber.onNext();
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
(下面加粗的这段话容易引起不适,可以跳过直接看后面的伪代码)
调用Bob.subscribe(OutSubscriber);设置绑定,Bob激活,BonSub.Call(OutSubscriber)方法被调用,参数为前台传入的OutSubscriber。
BonSub的Call方法中实际new了一个Subscriber,暂称Bsub。
Bsub包装了BonSub.Call方法参数中的Subscriber,也就是外部传入的OutSubscriber,以及Func1对象。
之后,将Bsub传入AonSubscribe.Call(Bsubscriber);
AonSub的处理结果会传给Bsub的对应方法,在Bsub.onNext(T Value)方法中,将结果与Bfunc1融合并传给OutSub处理,OutSub.onNext(Bfunc1.Call(Value));
</br>
....
</br>
哈哈,我还是用笨办法来模拟下flatMap的思路吧。
(以下为伪代码)
//ob指代Observable、
//onSub指代onSubscribe、
//sub指代Subscriber。
AonSub = new onSubscribe(){....//做某些事.....};
Afunc1 = new Func1(){ T Call(t){.....//数据处理.....}; }
//先建一个Bob的onSub;
BonSub = new onSubscribe(){
//绑定
call(outSubscriber){
//新建一个sub
Subscriber Bsub = new Subscriber(){
//AonSub.call处理结果返回,再给Afunc1.call处理,结果传给outSubscriber
onNext(t){ outSubscriber.onNext(Afunc1.call(t)); }
onErro(o){ outSubscriber.onErro(o); }
oncom...同上...
}
//将新建的Bsub传给AonSub.call处理
AonSub.call(BonSub );
}
}
//绑定
BonSub.call(outSubscriber);
</br>
3,subscribeOn的调用流程
(以下省略适配、优化等细节代码)
//伪代码
OnSubscribe AonSub = new OnSubscribe(){...};
Subscriber Asub = new Subscriber(){....};
observable Aob = Observable.create( AonSub );
Aob
.subscribeOn(Schedulers.io()) //指定AonSub.call()运行在io线程
.subscribe(Asub);
在上面的例子中:
.subscribeOn(Schedulers.io()) //指定AonSub.call()运行在io线程。
而内部的实现思路与flatMap、map操作符的实现思路异曲同工。
//返回Observable
public final Observable<T> subscribeOn(Scheduler scheduler) {
....
//调用Aob.create(new OnSubscribe);构建Observable并返回
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
//OperatorSubscribeOn实现了OnSubscribe
public final class OperatorSubscribeOn<T> implements OnSubscribe<T>
subscribeOn中利用observable.create(OnSubscribe)新建observable,并返回。传入的OperatorSubscribeOn实现了onSubscriber。
//因此,第一部分的实例代码可以写成这样
//伪代码
OnSubscribe AonSub = new OnSubscribe(){...};
Subscriber Asub = new Subscriber(){....};
observable Aob = Observable.create( AonSub );
observable Bob = Aob.subscribeOn(Schedulers.io());
Bob.subscribe(Asub);
没问题吧?现在就剩下OperatorSubscribeOn是什么了。毕竟observable中OnSubscribe才是核心。
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
//传进来的Scheduler对象,可以创建工作线程
final Scheduler scheduler;
//传进来的Observable对象
final Observable<T> source;
//构造方法
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
//source为调用subscribeOn的observable
this.source = source;
}
//ob.subscribe(sub);绑定,sub会传到这里
public void call(final Subscriber<? super T> outSubscriber) {
//创建工作线程
final Worker inner = scheduler.createWorker();
...
//在Worker上运行Action0.call()方法
inner.schedule(new Action0() {
@Override
public void call() {
...
//新创建了一个Subscriber,装饰者模式
Subscriber<T> s = new Subscriber<T>(subscriber) {
public void onNext(T t) {
subscriber.onNext(t);
}
public void onError(Throwable e) {
subscriber.onError(e);
}
public void onCompleted() {
subscriber.onCompleted();
}
...反背压相关
};
//调用observable的unsafeSubscribe(Subscriber)方法
source.unsafeSubscribe(s);
}
});
}
}
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
...
//这句不解释了
subscriber.onStart();
//hook.onSubscribeStart(this, onSubscribe)将参数onSubscribe原样返回,真的就一行代码
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
//hook.onSubscribeReturn(subscriber)方法参数为Subscription,
//Subscriber类型实现了Subscription,
//方法中将传入的subscriber原样返回,真的就一行代码
return hook.onSubscribeReturn(subscriber);
...
}
(下面加粗的这段话容易引起不适,可以跳过直接看后面的伪代码)
1,Bob.subscribe(outSubscriber);绑定后,将outSubscriber传入BonSub.Call(outSubscriber);
2,BonSub.Call(outSubscriber)中新建Bsub,Bsub的onNext()方法中调用outSubscriber的onNext()方法,onErro()、onCompleted()也一样。
3,新建Bsub之后利用线程调度器,在指定线程中运行Aob.AonSub.Call(Bsub);
4,Aob.AonSub.Call(Bsub)中将处理结果传给Bsub.onNext(result)、onErro(throwable)、onCompleted(),这一步是同步的。
5,outSubscriber.onNext(result)、onErro(thrwable)、onCompleted()同样运行在指定线程
重复4~5步,直到数据处理完毕
...还是看伪代码吧,用笨办法展开实现
//准备工作
OnSubscribe AonSub = new OnSubscribe(){......}
Subscriber outSub = new Subscriber(){
onNext(result){ ...数据处理 }
onErro.....
onComp...
};
//-------------正确的使用subscribeOn-----------------
Observable.create( AonSub )
.subscribeOn(Schedulers.io()) //指定AonSub.call()运行在io线程
.subscribe(outSub);//完毕,AonSub 中的逻辑运行在制定代码
//-------------下面是笨办法展开实现-------------------
//------subscribeOn(Schedulers.io())做了下面这些事
OnSubscribe BonSub = new OnSubscribe(){
//外部传入outSubscriber
call( outSubscriber ){
//装饰者模式
Subscriber Bsub = new Subscriber(){
//Bsub 包装Asub
onNext(result){ outSub.onNext(result) }
...
};
//这里只是想表示在指定线程运行
new Thread(){
run(){
//指定线程中运行 AonSub.call的逻辑
//并将装饰者Bsub传入,用以响应处理结果
AonSub.call( Bsub );
}
}
}
};
//创建Bob
observable Bob = Observable.create( BonSub );
//------对应subscribe(outSub)
Bob.subscribe(outSub);//完毕,AonSub 中的逻辑同样运行在指定线程
</br>
4,observeOn的调用流程
(以下省略适配、优化等细节代码)
//伪代码
OnSubscribe AonSub = new OnSubscribe(){...};
Subscriber Asub = new Subscriber(){....};
observable Aob = Observable.create( AonSub );
Aob
.observeOn(Schedulers.immediate()) //指定Asub运行线程运行在当前线程。
.subscribe(Asub);
在上面的例子中:
.observeOn(Schedulers.immediate()) //指定Asub运行线程运行在当前线程。
而内部的实现思路与flatMap、map操作符的实现思路异曲同工。
//返回Observable对象
public final Observable<T> observeOn(Scheduler scheduler) {
.....
//又是lift()
return lift(new OperatorObserveOn<T>(scheduler, false));
}
又是lift();前面说的flatMap、map中都用到了lift,再来回顾一下lift();
//参数Operator继承自Func1,只是一个接口
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//返回新的Observable
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
..
//新建Subscriber包装o,通过operator转换器的call方法获得
Subscriber<? super T> st = hook.onLift(operator).call(o);
..
st.onStart();
//将新建Subscriber传给onSubscribe
onSubscribe.call(st);
..
}
}
);
}
转换器Operator真是神器。
让我们看看observeOn中,调用lift时传入的OperatorObserveOn都做了什么吧。
public final class OperatorObserveOn<T> implements Operator<T, T> {
//线程调度器
private final Scheduler scheduler;
//是否优先发送正常数据,最后发送错误报告
private final boolean delayError;
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this.scheduler = scheduler;
this.delayError = delayError;
}
@Override
//参数与返回值都是Subscriber
public Subscriber<? super T> call(Subscriber<? super T> child) {
...判断,如果指定线程为当前线程或队列线程则直接返回child...
//ObserveOnSubscriber继承自Subscriber
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
//背压相关,这里就不列了
parent.init();
//返回ObserveOnSubscriber
return parent;
}
}
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
套路很像flatMap对吧,包装包装再包装。
返回的ObserveOnSubscriber对象,作为包装类,是observeOn的核心。
//只留下了核心逻辑代码
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
//外部传入的Subscriber,包装包装,包的就是child
final Subscriber<? super T> child;
//worker
final Scheduler.Worker recursiveScheduler;
//队列,规避背压的一种方案
final Queue<Object> queue;
....
//构造方法,线程调度器与被包装Subscriber
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
//赋值
this.child = child;
//赋值
this.recursiveScheduler = scheduler.createWorker();
...
}
public void onNext(final T t){
...
//调用了schedule()
schedule();
}
public void onCompleted(){
...
//调用了schedule()
schedule();
}
public void onError(final Throwable e){
...
//调用了schedule()
schedule();
}
//线程跳转,将结果返回
protected void schedule() {
...
//工作线程运行call();
recursiveScheduler.schedule(this);
...
}
//运行在工作线程中
public void call() {
...大面积反背压代码以及onError(e)、onCompleted()的回调...
//赋值,child是外部传进来的,被包装Subscriber
final Subscriber<? super T> localChild = this.child;
//将结果传给被包装的Subscriber
localChild.onNext(localOn.getValue(v));
},
}
(下面加粗的这段话容易引起不适,可以跳过直接看后面的伪代码)
observeOn与subscribeOn原理上也是很像的,只是subscribeOn针对的是onSubscriber,ObserveOn针对的是Subscribe。
使用subscribeOn,先切换线程,再执行onSubscriber任务表。
使用ObserveOn,先切换线程,再给subscriber发射onSubscriber的处理结果。
使用笨办法展开实现
//伪代码
//准备
Observable AonSub = new onSubscribe(){.......};
subscriber outSub = new subscriber(){.....}
//----正确姿势----
Observable.create( AonSub )//创建Observable
.subscribeOn( ... );//线程转换
.subscribe( outSub );//绑定
//----展开实现----
//新建BonSub;
BonSub = new onSubscribe(){
//绑定,传入外部Sub
call( outSub ){
//新建subscriber,包装外部subscriber
Bsub = new subscriber(){
//new Thread()只是想表示”指定线程“
onNext( result ){ new Thread(){ run(){ outSub.onNext( result ) } }; }
//new Thread()只是想表示”指定线程“
onErro( e ){ new Thread(){ run(){ outSub.onErro( e ) } }; }
//new Thread()只是想表示”指定线程“
onCompleted(){ new Thread(){ run(){ outSub.onCompleted() } }
}
//将新建subscriber传给AonSub.call( BonSub ),响应数据
AonSub.call( BonSub );
}
}
//新建Observable
Observable Bob = Observable.create( BonSub )
//新建Observable 绑定消费者
Bob.subscribe( outSub );
</br>
5,subscribeOn、observeOn混合调用流程
(以下省略适配、优化等细节代码)
subscribeOn针对的是onSubscriber,ObserveOn针对的是Subscribe。
(一)
//伪代码, ob指Observable
Aob= Observable.create( AonSub )
Bob= .subscribeOn(...)
Cob= .observeOn(...)
Cob .subscribe( outSub );
(二)
//伪代码, ob指Observable
Aob= Observable.create( AonSub )
Bob= .subscribeOn()
Cob= .subscribeOn()
Dob= .observeOn()
Dob .subscribe( outSub );
(三)
//伪代码, ob指Observable
Aob= Observable.create( AonSub )
Bob= .subscribeOn()
Cob= .observeOn()
Dob= .subscribeOn()
Dob .subscribe( outSub );
</br>
6,flatMap、map、ObserveOn、subscribeOn混合调用流程
(以下省略适配、优化等细节代码)
subscribeOn针对的是onSubscriber,ObserveOn针对的是Subscribe。
flatMap的融合Func1这一步是在包装Subscriber中完成的
(一)
//伪代码, ob指Observable
Aob= Observable.create(...)
Bob= .subscribeOn(...)
Cob= .flatMap(...)
Dob= .observeOn(...)
.subscribe( outSub );
(二)
//伪代码, ob指Observable
Aob= Observabale.create(...)
Bob= .subscribeOn(...)
Cob= .observeOn(...)
Dob= .flatMap(...)
.subscribe( outSub );
(三)
//伪代码, ob指Observable
Aob= Observable.create(...)
Bob= .subscribeOn(...)
Cob= .flatMap(...)
Dob= .observeOn(...)
Eob= .flatMap(...)
Fob= .observeOn(...)
.subscribe( outSub );