RxJava大概流程:
1.Observable.create 创建事件源,但并不生产也不发射事件。
代码创建被观察者Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onComplete();
}
});
源码跟进Observable.create
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
//ObservableOnSubscribe是个接口,只包含subscribe方法。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");//判段是否为空
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
继续跟进RxJavaPlugins.onAssembly方法
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
通过调试得知静态对象onObservableAssembly默认为null, 所以此方法直接返回传入的参数source。
由此可以看出来整个Observable.create方法就是创建了个事件源new ObservableCreate()对象,将ObservableOnSubscribe作为参数传递给ObservableCreate的构造函数。事件是由接口ObservableOnSubscribe的subscribe方法上产生的,但此时并不生产也不发射事件。
2.实现observer接口,此时没有也无法接受到任何发射来的事件。
代码创建观察者Observer
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String str) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
源码跟进Observer
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
Observer就是个接口,包含了上面四个方法,所以此时没有也无法接受到任何发射来的事件
3.订阅 observable.subscribe(observer), 此时会调用具体Observable的实现类中的subscribeActual方法,此时才会真正触发事件源生产事件,事件源生产出来的事件通过Emitter的onNext,onError,onComplete发射给observer对应的方法由下游observer消费掉。从而完成整个事件流的处理。
代码关联观察者和被观察者
observable.subscribe(observer);
代码跟进subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//hook ,默认直接返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//真正实现订阅的方法。
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
跟进subscribeActual方法
//subscribeActual 是抽象方法,所以需要到实现类中去看具体实现,也就是上文中1.提到的ObservableCreate中
protected abstract void subscribeActual(Observer<? super T> observer);
public ObservableCreate(ObservableOnSubscribe<T> source) {
//事件源,生产事件的接口
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//直接回调了观察者的onSubscribe 并传回了Disposable
//observer中可以利用Disposable来随时中断事件流的发射。
observer.onSubscribe(parent);
try {
// 调用了事件源subscribe方法生产事件,同时将发射器传给事件源。
// 现在我们明白了,数据源生产事件的subscribe方法只有在observable.subscribe(observer)被执行后才执行的。 换言之,事件流是在订阅后才产生的。
// 而observable被创建出来时并不生产事件,同时也不发射事件。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
现在可以看出来,只有subscribe订阅后,数据源才会开始生产事件和发射事件。
接下来看看事件是如何被发射出去的,看下CreateEmitter源码
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {}
CreateEmitter 实现了ObservableEmitter接口,同时ObservableEmitter接口又继承了Emitter接口。
CreateEmitter 还实现了Disposable接口,这个disposable接口是用来判断是否中断事件发射的。
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
看到这就能看出来,当订阅成功后,数据源ObservableOnSubscribe开始生产事件,调用Emitter的onNext,onComplete向下游发射事件,Emitter包含了observer的引用,又调用了observer onNext,onComplete,这样下游observer就接收到了上游发射的数据。
RxJava的线程调度机制
感谢大神们的心得:
https://www.cnblogs.com/linghu-java/p/9719427.html