写在前面的话
看了源码也不少,但是每次看源码都仅仅,也就是仅仅跟着作者一步步跟进ta的方法,点进去,再点进去,再再点进去,渐渐的感觉自己当时看的太肤浅,有很多问题都会冒出来,最多的就是为什么要这样写呢,一追究这个问题就会发现自己在知识的海洋里是如此的渺小。写了这么多,希望能够在以后眼光放高一些,看大局,看别人是如何排兵布阵!
rxjava执行原理
先从调用rxjava功能的代码入手,首先最简单的调用就是:
Observable.create(new Observable.OnSubscribe<Integer>() {//创建
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
Observable.create()
首先调用
Observable.create()
创建一个生产者Observable
,同时创建了一个OnSubscribe
作为其参数传给Observable
OnSubscribe
是Observable
的一个内部接口,从它的注释"* Invoked when Observable.subscribe is called."可以看出当生产者被消费者订阅的时候,它将会被激活。-
再看方法内部:
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
-
接着我们看到创建的
OnSubscribe
又传入了RxJavaHooks.onCreate(f)
中:public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }
这里判断了
RxJavaHooks
中的成员变量onObservableCreate
是否为空,在RxjavaHooks
的内部有一个静态方法initCreate()
可以对其进行初始化,但是我们并没有调用它,所有我们最后返回的onSubscribe
依然还是我们自己创建的onSubscribe
。-
倒回至Observable的构造函数,最后将这个
onSubscribe
赋值给了Observable的成员变量。protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
Subscriber
创建了生产者Observable
,那么肯定还要创建消费者Observer
,Subscriber
就是Observer
接口的一个实现类,同时还实现了Subscription
接口,这个接口的方法unsubscribe()
用于取消订阅,还有一个isUnsubscribed()
方法判断订阅的状态,unsubscribe()
这个方法很关键,因为在 subscribe()
之后, Observable
会持有 Subscriber
的引用,这个引用如果不能及时被释放,将有内存泄露的风险。
subscribe()
-
接下来就是
subscribe()
方法:static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // new Subscriber so onStart it subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... } return Subscriptions.unsubscribed(); } }
首先
subscriber = new SafeSubscriber<T>(subscriber)
会将我们自己写的subscriber
进行包装,其实也就是代理的设计模式,在我们写的代码中通过代理进行一些安全校验,这里就保证了onCompleted()
和onError()
只会有一个执行切只执行一次。接着看
RxJavaHooks.onObservableStart(observable, observable.onSubscribe)
就会发现,同样该方法返回的就是我们创建的onSubscribe
,之后还调用了它的call()
方法,也就成了
调用我们自己写的call()
方法。而
call()
方法中的参数subscriber
其实就是我们在调用subscribe()
订阅时,作为参数传进来的subscriber
,这样也就让我们在生产者这里调用到了消费者的方法,这样也就达到了观察者的目的。
rxjava变换原理
rxjava的变换虽然功能各有不同,但实质上都是针对时间序列的处理和再发送,这里我们就通过map()
来了解其中的原理,一下就是利用map
的一段代码:
Observable.just(1,2,3)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "i = " + integer;
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("s : " + s);
}
});
接着看map()
方法:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
是不是感觉很熟悉,返回的就是创建Observable
的方法,也就是说它将我们map()
中的func
作为onSubscribeMap
的构造参数,那么onSubscribeMap
又是什么,它其实就是OnSubscribe
的一个实现类,先看其构造方法:
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
这里通过内部的成员变量保存了传递进来的Observable
和Func1
,记住,这里也就是说在外头调用该方法的Observable
,也就是源Observable
保存在了source
,而我们在map()
中写的Func1()
保存在了transformer
中,在往后看.subscribe()
,注意调用订阅方法的已经不是源Observable
,而是通过map()
内部自己创建返回的一个新Observable
,也就是说新的Observable
持有了Subscriber
的对象,那么,订阅了之后自然就会激活Observable
发射数据,也就是onSubscribe
中的call()
方法开始执行,在这里就是onSubscribeMap
的call()
方法。
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
代码开头就创建了一个MapSubscriber
,它是Subscriber
的一个子类,构造函数如下:
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
联系上头代码,可以发现它将传入的Subscriber
包装成了MapSubscriber
,同时还讲源Subscriber
和Func1
保存在了成员变量中,之后执行的时候肯定要执行onNext()
方法,直接看onNext()
:
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
这里有一个泛型R,也就是返回值类型,是通过构造函数传入的,最重要的代码result = mapper.call(t)
,这里的mapper
就是我们在map()
中写的Func1
,接着传入参数,通过我们自己的方法得到我们想要的结果返回。