本篇包含内容包括
1 RxJava事件流向的基本流程;
2 自己手写一个RxJava的基本流程。
1 RxJava事件流向的基本流程
最开始接触RxJava
时,很多文章把observer
,observable
对象定义为观察者和被观察者。其实这样容易把人给绕晕,直接把observable
看成上游产生事件者,把observer
看出下游接收处理事件者。
RxJava2
最简单的调用方式如下:
Observable.create(new ObservableOnSubscribe<String>() { // ①
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
基本的逻辑是:Observable
通过create
方法创建一个<? extends Observable>
的类,然后通过<? extends Observable>
类调用subscribe
方法,并传入一个观察者observer
。
需要搞定的问题:
问:
create
方法创建的到底是什么类型的实例?
带着问题我们先来看①
处的create
方法,主要涉及的内容如下:
// Observable 类
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); ①
}
// RxJavaPlugins类
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source; ②
}
为避免分散精力,我们只看主流分支,搞清主流分支,所有的脉络就基本清晰。
由上①
处可知,返回的Observable
对象就是方法onAssembly
的返回值,而当onObservableAssembly
为空时,返回值其实就是①
处传进来的ObservableCreate
对象。
所以到此,我们可以回答上面的问题了:create创建的对象就是ObservableCreate。
那么正常逻辑下,我们现在肯定要找ObservableCreate
的subscribe
方法。
嗯嗯~可事情会如我们预期的那么顺利吗?我们在ObservableCreate
类中是找不到subscribe
方法。
问:
subscribe
在哪里?
我们来看看ObservableCreate
类部分源码:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
由上可知ObservableCreate
继承至Observable
,但我们并没有找到subscribe
方法。于是第一个想法就是去父类找,也就是Observable
类中。
// Observable类
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
我的乖乖~,subscribe
竟然是个final
方法,难怪在子类中看不到这个方法,不过我们看到了另一个方法subscribeActual
,点进去一看:
// Observable类
protected abstract void subscribeActual(Observer<? super T> observer);
是个抽象方法,这我们就放心了,有点类似与Android
控件View
中Measure
方法中的onMeasure
。
于是乎,我们就可以回答上面提出的问题。
答:Observable
的subscribe
方法是抽象方法,所有子类继承Observable
后,实现其抽象方法subscribeActual
,进行实际的订阅操作。
于是我们就直接去看ObservableCreate
类中的subscribeActual
方法:
// ObservableCreate类
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); ①
observer.onSubscribe(parent); ②
try {
source.subscribe(parent); ③
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
问:发送消息的基本原理?
①处创建发射器
②处的方法是不是看上去很眼熟,对的,没错,它就是我们观察者Observer
最先被调用的方法。
③处source
就是我们最开始调用的create
方法中的参数ObservableOnSubscribe
,也就是发射器中的参数。
还记得我们最开始那个例子吗?
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
})
......
③处传入的参数就是此处ObservableEmitter
对象的实例,所以每次当
我们利用emitter
发送消息时,就触发了CreateEmitter
类的onNext
方法。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 触发observer对象的onNext方法
observer.onNext(t);
}
}
省略部分代码.....
}
2 自己手写一个RxJava的基本流程
古人云:纸上得来终觉浅,绝知此事要躬行。
我们自己手写一个这个的过程比任何教程都会要记得深刻,牢固。
写我们的Observable
,我把命名都加上一个perry
前缀。
abstract class PerryObservable<T> {
// 订阅开始调用的方法
fun subscribe(observer: PerryObserver<in T>) {
subscribeActual(observer)
}
internal abstract fun subscribeActual(observer: PerryObserver<in T>)
companion object {
// create构造方法
fun <T> create(source: PerryObservableOnSubscribe<T>): PerryObservable<T> {
return PerryObservableCreate(source)
}
}
}
接下来是PerryObservableCreate
。
class PerryObservableCreate<T>(private val source: PerryObservableOnSubscribe<T>) : PerryObservable<T>() {
override fun subscribeActual(observer: PerryObserver<in T>) {
val emitter = PerryCreateEmitter(observer)
observer.onSubscribe(emitter)
source.subscribe(emitter)
}
// 这是我们的发射器
class PerryCreateEmitter<T> internal constructor(private val observer: PerryObserver<in T>)
: PerryDisposable, PerryEmitter<T> {
override fun dispose() {
}
override fun isDisposed(): Boolean {
return false
}
override fun onNext(value: T) {
observer.onNext(value)
}
override fun onError(error: Throwable) {
observer.onError(error)
}
override fun onComplete() {
observer.onComplete()
}
}
}
其他全部都是一些接口,就不一一贴出来啦。
interface PerryEmitter<T> {
fun onNext(@NonNull value: T)
fun onError(@NonNull error: Throwable)
fun onComplete()
}
最终调用方法如下:
PerryObservable.Companion.create(new PerryObservableOnSubscribe<String>() {
@Override
public void subscribe(PerryObservableCreate.PerryCreateEmitter<String> emitter) {
emitter.onNext("hello");
emitter.onNext("world");
}
}).subscribe(new PerryObserver<String>() {
@Override
public void onSubscribe(PerryDisposable d) {
}
@Override
public void onNext(String s) {
Log.d("zp_test", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印日志:
至此,我们基本搞清其发生消息的逻辑,由于篇幅有限,其他更加高级的功能,期待下次再见!