RxJava3.0已经发布了,但是这里还是以RxJava2.x来分析部分源码。RxJava采用的是响应式编程的原理,采用观察者模式。
一、RxJava案例和流程
Observable<String> observable = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("hello");
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
这是一个标注的RxJava使用例子,其实这个流程就是先从上往下一级一级的封装对应的Observable,然后再从下往上通过Observable.subscribe订阅向上传递Observer观察者,最后ObservableCreate中将Observer进行封装CreateEmitter,调用ObservableOnSubscribe.subscribe(emitter)用于业务中通知,CreateEmitter是ObservableEmitter子类实现,是ObservableCreate的静态内部类
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
从CreateEmitter的内部onNext方法可以看出,其内部就是通过调用Observer.onNext通知观察者结果。其实整个流程来看,Observable向下一层层进行封装,通过钩子函数的方式,而Observer从下向上进行封装传递,而且业务都是在Observer中的onNext执行的,比如.map()中定义的Function,其实就是在MapObserver中的onNext中执行
二、分析每步的源码
RxJavaPlugins.onAssembly
在具体分析之前,首先先看RxJavaPlugins.onAssembly的具体实现,RxJavaPlugins.onAssembly方法可以说贯穿整个RxJava流程,RxJavaPlugins.onAssembly方法目的就是作为一个钩子函数,将之前的Observable进行封装,变成一个新的Observable。
在上面的例子中,这里的f对象一直都是null。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
1.Observable.subscribe
所有的Observable以及子类调用subscribe方法时,都是调用Observable.subscribe(),所以在上述流程中,第一个调用subscribe()方法的就是ObservableObServeOn这个Observable子类,ObservableObServeOn是在observeOn()方法中对上一个Observable进行封装创建的。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
2.ObservableObserveOn
ObservableObserveOn对象其实就是在调用ObserveOn的时候创建的。封装当前的Observable即在observeOn方法调用之前最新创建的Observable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
// 这里的source其实就是Observable实现类,在上面的例子中,其实就是ObservableSubscribeOn对象
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 封装Observer,然后通过调用上一个Observable.subscribe,向上传递Observer观察者
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
在这里创建了一个Scheduler.Worker,Scheduler是一个线程调度类,Worker其实就是针对线程调用的工作者。而Scheduler会通过不同的子类实现,将当前observeOn定义的线程,即让在observeOn之前封装的Observer都在该线程之前(Observer是从下向上传递的)。
ObserveOn与SubscribeOn有点相反,ObserveOn针对的是Observer,即观察者,观察者是从下向上传递封装的,而ObserverOn中接收到的观察者,其实是其下游封装之后的观察者,所以ObserveOn针对的是其下游
具体分析observeOn(AndroidSchedulers.mainThread())
(1)AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
(2)AndroidSchedulers.MAIN_THREAD
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
(3)AndroidSchedulers.MainHolder.DEFAULT
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
所以AndroidSchedulers.mainThread()的线程切换到主线程,其实就是交由Handler来实现。HandlerScheduler其实是Scheduler的子类,依然是用来创建Scheduler.Worker实例,然后通过Worker.schedule方法进行线程切换,将之前的线程切换到主线程。
(4)observeOn对应的Observable->ObservableObserveOn
而ObservableObserveOn中也有一个Scheduler实例,这个实例其实就是HandlerScheduler对象。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
(5)Scheduler.Worker w = scheduler.createWorker();
这里其实就是调用了HandlerScheduler这个Scheduler的createWorkder()方法
// HandlerScheduler.createWorker()
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
(6)线程切换
而ObserveOnObserver中的线程切换,其实就是调用ObserveOnObserver的schedule()方法实现的,而ObserveOnObserver中的Scheduler.Worker worker对象,是一个HandlerWorker对象。
// ObserveOnObserver类中的部分方法
@Override
public void run() {
if (outputFused) {
// 直接执行下游Observer.onNext方法。
drainFused();
} else {
// 从队列中取任务
drainNormal();
}
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这里的this,其实就是ObserveOnObserver对象,而在ObserveOnObserver中,都会将要执行的task放到队列中。而ObserveOnObserver本身就是一个Runnable
(7)HandlerSchedule.HandlerWorker.schedule
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
// 这里的run,其实就是ObserveOnObserver对象
run = RxJavaPlugins.onSchedule(run);
// 封装ObserveOnObserver对象,ObserveOnObserver本身就是一个Runnable
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
// 通过Handler执行消息,进而达到切换到主线程的目的
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
所以从上面的代码可以看出,其实ObserveOn的切换任务,首先会接收到会被后一个Observer调用onNext触发调用ObserveOnObserver的onNext()方法(Observer从下向上封装,所以下面是前一个,上面是后一个)然后就会调用ObserveOnObserver的schedule()方法,触发Handler同步执行任务,而封装的Runnable其实就是在其run()方法中调用了ObserveOnObserver的run()方法调用了前一个(下游)的Observer的onNext()方法,将结果转换线程回调给了观察者
所以是针对ObserveOn下游代码的。
3.ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
}
我们知道,subscribeOn只有设置第一次有效,这其实是因为SubscribeOnObserver.setDisposable方法中,调用的是DisposableHelper.setOnce(this, d);方法
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
...
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
而DisposableHelper.setOnce方法的实现如下:
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
而当前subscribeOn对应的Observable即ObservableSubscribeOn中的subscribeActual方法中,针对之前传进来的Observer做了封装,而这里的source其实就是在subscribeOn之前的Observable,所以在subscribeTask中的run方法中调用source.subscribe(parent);其实就是将subscribeOn之前的逻辑运行的线程切换到了subscribeOn指定的线程。而subscribeOn后面部分的代码,如果没有指定线程切换,都是在subscribeActual中调用source.subscribe()的,所以并不会在subscribeOn指定的线程中执行
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
如果调用多次的subscribeOn,其实每次的线程切换都会生效,但是最终只有第一个调用的subscribeOn会生效的,这个原因其实就是subscribeOn是切换其上游的线程,而subscribeOn线程切换,其实切换的就是source.subscribe(parent)所在的线程,如果create().subscribeOn,那么subscribeOn切换的就是create()中的source(即ObservableCreate)所在线程。subscribeOn创建对应的ObservableSubscribeOn这个Observable是从上向下的,但是调用subscribeActual,封装Observer是从下向上的,所以就算多次使用subscribeOn进行线程的切换,最终只有第一个subscribeOn生效,即最后被调用subscribeActual的ObservableSubscribeOn生效了。
subscribeOn与ObserveOn不同的是,ObserveOn切换的是Observer的线程,而subscribeOn切换的是Observable的线程。所以,subscribeOn是影响上游的操作,而observeOn影响的是其下游的操作
看这个图,其实ObservableSubscribeOn就是subscribeOn创建的对应的Observable,在这个Observable的subscribeActual方法中,其实就是线程切换执行任务,而对应的Runnable.run方法中调用的其实就是source.subscribe(),即ObservableSubscribeOn内部封装的Observable.subscribe线程做了切换调度,而ObservableSubscribeOn内部封装的Observable是subscribeOn上游创建的Observable
针对parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));做分析
(1)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
(2)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 创建工作对象,内部通过线程池执行runable任务,这个任务其实就是SubscribeTask
// 创建Scheduler.Worker实例,其目的就是为了创建EventLoopWorker实例以及其内部的ThreadWorker实例
// 最终的线程池调用就是通过ThreadWorker内部的线程池来进行,这样就将任务交给了ThreadWorker中的线程池
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 封装Runable,其实还是一个Runbale,只不过多实现了Disposable接口
// 这是为了可以在中断任务的时候,将这个异步执行的任务中断
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这里的createWorker()方法,其实是调用的IoScheduler.createWorker方法。
(3)IoScheduler.createWorker
@NonNull
@Override
public Worker createWorker() {
// 创建Scheduler.Worker实例对象,这时会构造器内部创建ThreadWorker实例
// ThreadWorker实例是从CachedWorkerPool实例调用get()方法创建的
return new EventLoopWorker(pool.get());
}
这里的pool是一个AtomicReference<CachedWorkerPool>对象,get()得到的是一个线程池包装类。
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
// 如果cached中没有ThreadWorker,则会创建一个,并且在ThreadWorker构造器中会创建线程池ScheduledExecutorService executor;
// 这个是因为ThreadWorker构造器执行父类构造器的时候创建的
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...
}
(4)第三步里的w.schedule其实就是调用IoScheduler中的内部类EventLoopWorker.schedule
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
而threadWorker其实就是通过CachedWorkerPool.get()得到的。
而threadWorker其实就是ThreadWorker,是IoScheduler的内部类,是NewThreadWorker的子类实现。所以这里调用scheduleActual,其实就是调用NewThreadWorker.scheduleActual
(5)NewThreadWorker.scheduleActual
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// executor其实就是线程池对象。sr就是要执行的任务,
// executor这个线程池是在创建ThreadWorker的时候初始化创建的
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
而任务中,其实就是ObservableSubscribeOn的静态内部类实现对象,SubscribeTask。而SubscribeTask这个Runnable的run方法中是调用了source.subscribe(),这个source其实就是在subscribeOn之前封装的Observable实例,所以这里的线程池异步调用的时候,其实就是执行subscribeOn之前的Observable.subscribe过程,所以subscribeOn是针对代码上游的线程切换。
但是Observer.onSubscribe并不会为异步,subscribeOn的线程切换不会针对onSubscribe,调用subscribe的Observable在什么线程,则onSubscribe就在什么线程中执行。而onError和onComplete都是会因为线程切换而影响。
4.ObservableMap
其实map操作符,内部创建的是ObservableMap,会传入Function对象,而Function对象是被封装在MapObserver这个Observer中的,当从上游向下调用发送Observer.onNext的时候,就会在MapObserver中触发Function中的操作。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
...
}
5.ObservableCreate
这里就看关键部分的源码,即subscribeActual的实现
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate类 = Observable的子类
...
// 仅贴出关键源码
final ObservableOnSubscribe<T> source;
// 构造函数
// 传入了传入source对象 = 手动创建的ObservableOnSubscribe对象
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
/**
* 重点关注:复写了subscribeActual()
* 作用:订阅时,通过接口回调 调用被观察者(Observerable) 与 观察者(Observer)的方法
* 该方法,是在被观察者调用subscribe()方法与观察者绑定的时候,调用的。
**/
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1. 创建1个CreateEmitter对象(封装成1个Disposable对象)
// 作用:发射事件
// CreateEmitter类中是对观察者的一个封装类,用于被观察者变化时向观察者分发事件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2. 调用观察者(Observer)的onSubscribe()
// onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()
// Observer对象的onSubscribe方法实现
observer.onSubscribe(parent);
try {
// 3. 调用source对象的subscribe()
// source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象
// subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()->>分析2
// 这里调用的,其实就是在Observable.create方法中,
// 实现ObservableOnSubscribe接口的时候,实现的subscrebe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
从这部分代码可以看出,在ObservableCreate.subscribeActual内部会调用ObservableOnSubscribe.subscribe,但是在调用这个方法之前,会调用观察者的onSubscribe()方法,其实就是事件开始。
三、操作符分析
1.分析map和flatMap的区别
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
要分析两个操作符的区别,首先看源码。
比如在map和flatMap两个操作符中,分别做网络请求,那么在map中,就会有问题。
因为如果在flatMap中执行Retrofit请求接口,返回的是一个Observable<T>,那么可以看出,如果是使用flatMap的话,则flatMap中的泛型<R>其实是接口请求的返回值的Observable<T>的泛型T,而如果是使用map的话,那么map中的泛型<R>其实就是接口请求返回值Observable<T>,那么map的返回值就会变成Observable<Observable<T>>,这结构出现了变化
所以map主要是用来做数据类型的转换的,从一个数据类型转为另外一个数据类型,而不会影响这个数据类型在Observable的,比如转换之前是Observable<T>,转换之后变成Observable<R>,依然是Observable结构的,转换的只是Observable中的泛型的类型。
而flatMap的话,可以使用Observable的泛型类型数据,得到一个新的Observable,然后使用这个新的Observable替代了旧的Observable
flatMap在实际应用场景中,可能会出现一个接口的请求你数据需要借助于前一个接口,这样的接口多层嵌套的情况,在这样的情况下,可以借助于flatMap来简化嵌套层次,在flatMap中还可以借助于Observable.fromIterable实现一个发射器功能,即遍历一个数组或者集合,然后按集合的长度进行遍历发射,这样在这个flatMap的后面的观察者就会执行多次。
2.doOnNext
使用doOnNext代替subscribe,使用doOnNext在两个请求的中间进行一次UI更新操作
MyRetrofit.createRetrofit().create(TestApi.class)
.register("947674559qq.com", "123456", "123456")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
// todo 注册完成之后更新ui
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<LoginResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(LoginResponse loginResponse) throws Exception {
Observable<LoginResponse> observable = MyRetrofit.createRetrofit().create(TestApi.class)
.loginWanAndroid(loginResponse.getData().getUsername(), loginResponse.getData().getPassword());
return observable;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {
// todo 如果是使用MVP模式,可以在BaseModel中定义CompositeDisposable,用来保存Disposable
// todo 然后BaseModel实现LifecycleObserver接口,这样在BaseModel中就可以根据注解回调到activity的生命周期onDestroy
// todo 然后在BaseModel对生命周期的回调中mCompositeDisposable?.dispose()
// todo 显示加载中的dialog
}
@Override
public void onNext(LoginResponse loginResponse) {
// todo 登录完成之后更新UI
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
// todo 登录完成之后才会回调complete,关闭dialog
}
});