Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。
Observable#subscribeOn(Scheduler)
在 Android 中,我们知道默认都是执行在主线程的,那么 Rxjava 是如何实现线程切换的。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): ");
}
});
我们先来看一下 subscribeOn 方法,可以看到
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
// scheduler 判空
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 用 ObservableSubscribeOn 将 scheduler 包装 起来
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
从 [图片上传失败...(image-8534da-1545226643196)] ,我们知道,当我们调用 observable.subscibe(observable) 的时候,最终会调用到具体的 observable 的实例的 subscribActual 方法。而这里具体的 observable 的实例为 ObservableSubscribeOn。
接下来,我们来看一下 ObservableSubscribeOn 这个类,可以看到继承 AbstractObservableWithUpstream ,而 AbstractObservableWithUpstream 继承 Observable,实现 HasUpstreamObservableSource 这个接口。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
---
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
public interface HasUpstreamObservableSource<T> {
/**
* Returns the upstream source of this Observable.
* <p>Allows discovering the chain of observables.
* @return the source ObservableSource
*/
ObservableSource<T> source();
}
observableSubscribeOn 的 subscribeActual 方法,跟 ObservableCreate 的 subscribeActual 的套路差不多,它也是 Observable 的一个子类。只不过比 ObservableCreate 多实现了一个接口HasUpstreamObservableSource,这个接口很有意思,他的 source() 方法返回类型是 ObservableSource(还记得这个类的角色吗?)。也就是说 ObservableSubscribeOn 这个 Observable 是一个拥有上游的 Observable 。他有一个非常关键的属性 source,这个 source 就代表了他的上游。
接下来我们一起来看一下 ObservableSubscribeOn 的具体实现
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
首先先来看他的构造函数 ,有两个参数 source ,scheduler。
source 代表上游的引用,是 Observable 的一个实例
scheduler 可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例
这里我们先大概了解一下 Scheduler 是个什么,Scheduler 里面封装了 Worker 和 DisposeTask,下面会详细讲到。
Schedulers.newThread()
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}