Android-RxJava源码解析

RxJava3.0已经发布了,但是这里还是以RxJava2.x来分析部分源码。RxJava采用的是响应式编程的原理,采用观察者模式。

一、RxJava案例和流程

        Observable<String> observable = Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("hello");
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
RxJava调用流程图.png

简易的流程图.png

这是一个标注的RxJava使用例子,其实这个流程就是先从上往下一级一级的封装对应的Observable,然后再从下往上通过Observable.subscribe订阅向上传递Observer观察者,最后ObservableCreate中将Observer进行封装CreateEmitter,调用ObservableOnSubscribe.subscribe(emitter)用于业务中通知,CreateEmitter是ObservableEmitter子类实现,是ObservableCreate的静态内部类

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }

从CreateEmitter的内部onNext方法可以看出,其内部就是通过调用Observer.onNext通知观察者结果。其实整个流程来看,Observable向下一层层进行封装,通过钩子函数的方式,而Observer从下向上进行封装传递,而且业务都是在Observer中的onNext执行的,比如.map()中定义的Function,其实就是在MapObserver中的onNext中执行


RxJava处理S型结构.png

二、分析每步的源码

RxJavaPlugins.onAssembly

在具体分析之前,首先先看RxJavaPlugins.onAssembly的具体实现,RxJavaPlugins.onAssembly方法可以说贯穿整个RxJava流程,RxJavaPlugins.onAssembly方法目的就是作为一个钩子函数,将之前的Observable进行封装,变成一个新的Observable。
在上面的例子中,这里的f对象一直都是null。

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

1.Observable.subscribe

所有的Observable以及子类调用subscribe方法时,都是调用Observable.subscribe(),所以在上述流程中,第一个调用subscribe()方法的就是ObservableObServeOn这个Observable子类,ObservableObServeOn是在observeOn()方法中对上一个Observable进行封装创建的。

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

2.ObservableObserveOn

ObservableObserveOn对象其实就是在调用ObserveOn的时候创建的。封装当前的Observable即在observeOn方法调用之前最新创建的Observable

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
// 这里的source其实就是Observable实现类,在上面的例子中,其实就是ObservableSubscribeOn对象
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 封装Observer,然后通过调用上一个Observable.subscribe,向上传递Observer观察者
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
}

在这里创建了一个Scheduler.Worker,Scheduler是一个线程调度类,Worker其实就是针对线程调用的工作者。而Scheduler会通过不同的子类实现,将当前observeOn定义的线程,即让在observeOn之前封装的Observer都在该线程之前(Observer是从下向上传递的)。
ObserveOn与SubscribeOn有点相反,ObserveOn针对的是Observer,即观察者,观察者是从下向上传递封装的,而ObserverOn中接收到的观察者,其实是其下游封装之后的观察者,所以ObserveOn针对的是其下游

具体分析observeOn(AndroidSchedulers.mainThread())
(1)AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
(2)AndroidSchedulers.MAIN_THREAD
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        });
(3)AndroidSchedulers.MainHolder.DEFAULT
private static final class MainHolder {
    static final Scheduler DEFAULT
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

所以AndroidSchedulers.mainThread()的线程切换到主线程,其实就是交由Handler来实现。HandlerScheduler其实是Scheduler的子类,依然是用来创建Scheduler.Worker实例,然后通过Worker.schedule方法进行线程切换,将之前的线程切换到主线程。

(4)observeOn对应的Observable->ObservableObserveOn

而ObservableObserveOn中也有一个Scheduler实例,这个实例其实就是HandlerScheduler对象。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
}
(5)Scheduler.Worker w = scheduler.createWorker();

这里其实就是调用了HandlerScheduler这个Scheduler的createWorkder()方法

// HandlerScheduler.createWorker()
@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
(6)线程切换

而ObserveOnObserver中的线程切换,其实就是调用ObserveOnObserver的schedule()方法实现的,而ObserveOnObserver中的Scheduler.Worker worker对象,是一个HandlerWorker对象。

// ObserveOnObserver类中的部分方法

@Override
public void run() {
    if (outputFused) {
        // 直接执行下游Observer.onNext方法。
        drainFused();
    } else {
        // 从队列中取任务
        drainNormal();
    }
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

这里的this,其实就是ObserveOnObserver对象,而在ObserveOnObserver中,都会将要执行的task放到队列中。而ObserveOnObserver本身就是一个Runnable

(7)HandlerSchedule.HandlerWorker.schedule
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    // 这里的run,其实就是ObserveOnObserver对象
    run = RxJavaPlugins.onSchedule(run);

    // 封装ObserveOnObserver对象,ObserveOnObserver本身就是一个Runnable
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }

    // 通过Handler执行消息,进而达到切换到主线程的目的
    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

所以从上面的代码可以看出,其实ObserveOn的切换任务,首先会接收到会被后一个Observer调用onNext触发调用ObserveOnObserver的onNext()方法(Observer从下向上封装,所以下面是前一个,上面是后一个)然后就会调用ObserveOnObserver的schedule()方法,触发Handler同步执行任务,而封装的Runnable其实就是在其run()方法中调用了ObserveOnObserver的run()方法调用了前一个(下游)的Observer的onNext()方法,将结果转换线程回调给了观察者
所以是针对ObserveOn下游代码的。

3.ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ...
}

我们知道,subscribeOn只有设置第一次有效,这其实是因为SubscribeOnObserver.setDisposable方法中,调用的是DisposableHelper.setOnce(this, d);方法

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
        ...

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

而DisposableHelper.setOnce方法的实现如下:

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

而当前subscribeOn对应的Observable即ObservableSubscribeOn中的subscribeActual方法中,针对之前传进来的Observer做了封装,而这里的source其实就是在subscribeOn之前的Observable,所以在subscribeTask中的run方法中调用source.subscribe(parent);其实就是将subscribeOn之前的逻辑运行的线程切换到了subscribeOn指定的线程。而subscribeOn后面部分的代码,如果没有指定线程切换,都是在subscribeActual中调用source.subscribe()的,所以并不会在subscribeOn指定的线程中执行

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

如果调用多次的subscribeOn,其实每次的线程切换都会生效,但是最终只有第一个调用的subscribeOn会生效的,这个原因其实就是subscribeOn是切换其上游的线程,而subscribeOn线程切换,其实切换的就是source.subscribe(parent)所在的线程,如果create().subscribeOn,那么subscribeOn切换的就是create()中的source(即ObservableCreate)所在线程。subscribeOn创建对应的ObservableSubscribeOn这个Observable是从上向下的,但是调用subscribeActual,封装Observer是从下向上的,所以就算多次使用subscribeOn进行线程的切换,最终只有第一个subscribeOn生效,即最后被调用subscribeActual的ObservableSubscribeOn生效了。

subscribeOn与ObserveOn不同的是,ObserveOn切换的是Observer的线程,而subscribeOn切换的是Observable的线程。所以,subscribeOn是影响上游的操作,而observeOn影响的是其下游的操作

image.png

看这个图,其实ObservableSubscribeOn就是subscribeOn创建的对应的Observable,在这个Observable的subscribeActual方法中,其实就是线程切换执行任务,而对应的Runnable.run方法中调用的其实就是source.subscribe(),即ObservableSubscribeOn内部封装的Observable.subscribe线程做了切换调度,而ObservableSubscribeOn内部封装的Observable是subscribeOn上游创建的Observable
image.png

针对parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));做分析
(1)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
(2)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 创建工作对象,内部通过线程池执行runable任务,这个任务其实就是SubscribeTask
    // 创建Scheduler.Worker实例,其目的就是为了创建EventLoopWorker实例以及其内部的ThreadWorker实例
    // 最终的线程池调用就是通过ThreadWorker内部的线程池来进行,这样就将任务交给了ThreadWorker中的线程池
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    // 封装Runable,其实还是一个Runbale,只不过多实现了Disposable接口
    // 这是为了可以在中断任务的时候,将这个异步执行的任务中断
    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

这里的createWorker()方法,其实是调用的IoScheduler.createWorker方法。

(3)IoScheduler.createWorker
@NonNull
@Override
public Worker createWorker() {
    // 创建Scheduler.Worker实例对象,这时会构造器内部创建ThreadWorker实例
    // ThreadWorker实例是从CachedWorkerPool实例调用get()方法创建的
    return new EventLoopWorker(pool.get());
}

这里的pool是一个AtomicReference<CachedWorkerPool>对象,get()得到的是一个线程池包装类。

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            // 如果cached中没有ThreadWorker,则会创建一个,并且在ThreadWorker构造器中会创建线程池ScheduledExecutorService executor;
            // 这个是因为ThreadWorker构造器执行父类构造器的时候创建的
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
...
}
(4)第三步里的w.schedule其实就是调用IoScheduler中的内部类EventLoopWorker.schedule
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

而threadWorker其实就是通过CachedWorkerPool.get()得到的。
而threadWorker其实就是ThreadWorker,是IoScheduler的内部类,是NewThreadWorker的子类实现。所以这里调用scheduleActual,其实就是调用NewThreadWorker.scheduleActual

(5)NewThreadWorker.scheduleActual
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        // executor其实就是线程池对象。sr就是要执行的任务,
        // executor这个线程池是在创建ThreadWorker的时候初始化创建的
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

而任务中,其实就是ObservableSubscribeOn的静态内部类实现对象,SubscribeTask。而SubscribeTask这个Runnable的run方法中是调用了source.subscribe(),这个source其实就是在subscribeOn之前封装的Observable实例,所以这里的线程池异步调用的时候,其实就是执行subscribeOn之前的Observable.subscribe过程,所以subscribeOn是针对代码上游的线程切换。
但是Observer.onSubscribe并不会为异步,subscribeOn的线程切换不会针对onSubscribe,调用subscribe的Observable在什么线程,则onSubscribe就在什么线程中执行。而onError和onComplete都是会因为线程切换而影响。

4.ObservableMap

其实map操作符,内部创建的是ObservableMap,会传入Function对象,而Function对象是被封装在MapObserver这个Observer中的,当从上游向下调用发送Observer.onNext的时候,就会在MapObserver中触发Function中的操作。

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        ...
    }

5.ObservableCreate

这里就看关键部分的源码,即subscribeActual的实现

    public final class ObservableCreate<T> extends Observable<T> {
  // ObservableCreate类 = Observable的子类 

      ...
      // 仅贴出关键源码

        final ObservableOnSubscribe<T> source;

        // 构造函数
      // 传入了传入source对象 = 手动创建的ObservableOnSubscribe对象
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        
    /** 
      * 重点关注:复写了subscribeActual()
      * 作用:订阅时,通过接口回调 调用被观察者(Observerable) 与 观察者(Observer)的方法
      * 该方法,是在被观察者调用subscribe()方法与观察者绑定的时候,调用的。
      **/
        @Override
        protected void subscribeActual(Observer<? super T> observer) {

            // 1. 创建1个CreateEmitter对象(封装成1个Disposable对象)
            // 作用:发射事件
            // CreateEmitter类中是对观察者的一个封装类,用于被观察者变化时向观察者分发事件
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);

            // 2. 调用观察者(Observer)的onSubscribe()
            // onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()
            // Observer对象的onSubscribe方法实现
            observer.onSubscribe(parent);

            try {
                // 3. 调用source对象的subscribe()
                // source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象 
                // subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()->>分析2
                // 这里调用的,其实就是在Observable.create方法中,
                // 实现ObservableOnSubscribe接口的时候,实现的subscrebe方法
                source.subscribe(parent);

            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
    }

从这部分代码可以看出,在ObservableCreate.subscribeActual内部会调用ObservableOnSubscribe.subscribe,但是在调用这个方法之前,会调用观察者的onSubscribe()方法,其实就是事件开始。

三、操作符分析

1.分析map和flatMap的区别

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

要分析两个操作符的区别,首先看源码。
比如在map和flatMap两个操作符中,分别做网络请求,那么在map中,就会有问题。
因为如果在flatMap中执行Retrofit请求接口,返回的是一个Observable<T>,那么可以看出,如果是使用flatMap的话,则flatMap中的泛型<R>其实是接口请求的返回值的Observable<T>的泛型T,而如果是使用map的话,那么map中的泛型<R>其实就是接口请求返回值Observable<T>,那么map的返回值就会变成Observable<Observable<T>>,这结构出现了变化
所以map主要是用来做数据类型的转换的,从一个数据类型转为另外一个数据类型,而不会影响这个数据类型在Observable的,比如转换之前是Observable<T>,转换之后变成Observable<R>,依然是Observable结构的,转换的只是Observable中的泛型的类型。
而flatMap的话,可以使用Observable的泛型类型数据,得到一个新的Observable,然后使用这个新的Observable替代了旧的Observable

flatMap在实际应用场景中,可能会出现一个接口的请求你数据需要借助于前一个接口,这样的接口多层嵌套的情况,在这样的情况下,可以借助于flatMap来简化嵌套层次,在flatMap中还可以借助于Observable.fromIterable实现一个发射器功能,即遍历一个数组或者集合,然后按集合的长度进行遍历发射,这样在这个flatMap的后面的观察者就会执行多次。

2.doOnNext

使用doOnNext代替subscribe,使用doOnNext在两个请求的中间进行一次UI更新操作

MyRetrofit.createRetrofit().create(TestApi.class)
        .register("947674559qq.com", "123456", "123456")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(new Consumer<LoginResponse>() {
            @Override
            public void accept(LoginResponse loginResponse) throws Exception {
                // todo 注册完成之后更新ui
            }
        })
        .observeOn(Schedulers.io())
        .flatMap(new Function<LoginResponse, ObservableSource<LoginResponse>>() {
            @Override
            public ObservableSource<LoginResponse> apply(LoginResponse loginResponse) throws Exception {
                Observable<LoginResponse> observable = MyRetrofit.createRetrofit().create(TestApi.class)
                        .loginWanAndroid(loginResponse.getData().getUsername(), loginResponse.getData().getPassword());
                return observable;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<LoginResponse>() {
            @Override
            public void onSubscribe(Disposable d) {
                // todo 如果是使用MVP模式,可以在BaseModel中定义CompositeDisposable,用来保存Disposable
                // todo 然后BaseModel实现LifecycleObserver接口,这样在BaseModel中就可以根据注解回调到activity的生命周期onDestroy
                // todo 然后在BaseModel对生命周期的回调中mCompositeDisposable?.dispose()
                // todo 显示加载中的dialog
            }

            @Override
            public void onNext(LoginResponse loginResponse) {
                // todo 登录完成之后更新UI
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                // todo 登录完成之后才会回调complete,关闭dialog
            }
        });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343