RXJava的使用与源码解析

RxJava 到底是什么

一个词:异步。

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

先来看一段代码

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

如果是rxJava的话,实现方法是这样的

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

从上面的代码中,我们可以看到,rxJava的代码,要比New Thread的代码要长。RXJava的代码可读性,要比new Thead要强很多

在rxJava中需要知道三个对象
1.Observer 观察者
对于观察者而言,是对事件做出处理后的回调。Observer回调包含onSubscribe注册时回调,onNext事件到达时回调,onError抛出异常时回调,onComplete事件完成时回调。
2.Observable 被观察对象
由该对象开始订阅,并对事件进行一系列的变换处理。最终该事件到达观察者的回调中。
3.subscribe() 订阅
观察者订阅被观察者,订阅的时候会执行ObservableOnSubscribe中的subscribe订阅函数。
ObservableEmitter 发射器
emitter.onNext();通过发射器发送一个事件

   // 1、创建观察者
        final Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
              //被订阅之后,会先执行这个方法
                Log.e(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                //被观察者调用onNext
                Log.e(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
          //被观察者调用onError
                Log.e(TAG, "error");
            }

            @Override
            public void onComplete() {
//被观察者调用onComplete
                Log.e(TAG, "complete");
            }
        };

        //2、创建被观察者
        Observable<Integer> observable =  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                ///4、发射相应事件
                emitter.onNext(“hello”);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });

        ///3、创建订阅者,将观察者与被观察者关联起来
        observable.subscribe(observer);

输出为
subscribe
hello
2
complete

被观察者Observable 的第二个用法just

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

被观察Observable 者的第三个用法from

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

被观察Observable 者的第四个用法fromArray

Observable observable = Observable.fromArray("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

被观察Observable 者的第五个用法range

Observable<Integer> observable =  Observable.range(1,3);
// 将会依次调用:
// onNext(1);
// onNext(2);
// onNext(3);
// onCompleted();

被观察Observable 者的还有第六个用法interval

    @CheckReturnValue
    @SchedulerSupport("io.reactivex:computation")
    public static Observable<Long> interval(long period, TimeUnit unit) {
        return interval(period, period, unit, Schedulers.computation());
    }
observable = Observable.interval(10, TimeUnit.SECONDS);//间隔10秒
输出日志
16:03:31.359 3518-3518/com.example.testrxjava E/MainActivity: subscribe
16:03:41.363 3518-3538/com.example.testrxjava E/MainActivity: 0
16:03:51.361 3518-3538/com.example.testrxjava E/MainActivity: 1
16:04:01.361 3518-3538/com.example.testrxjava E/MainActivity: 2
16:04:11.361 3518-3538/com.example.testrxjava E/MainActivity: 3
16:04:21.362 3518-3538/com.example.testrxjava E/MainActivity: 4
16:04:31.361 3518-3538/com.example.testrxjava E/MainActivity: 5

被观察Observable 者的还有第七个用法timer

observable = Observable.timer(10, TimeUnit.SECONDS);
输出日志为
 16:02:18.312 3433-3433/com.example.testrxjava E/MainActivity: subscribe
 16:02:28.315 3433-3453/com.example.testrxjava E/MainActivity: 0
 16:02:28.316 3433-3453/com.example.testrxjava E/MainActivity: complete

Rxjava的Disposable

rxjava虽然好用,但是总所周知,容易遭层内存泄漏。也就说在订阅了事件后没有及时取阅,导致在activity或者fragment销毁后仍然占用着内存,无法释放。而disposable便是这个订阅事件,可以用来取消订阅。

public interface Disposable {
    void dispose();//中断订阅事件

    boolean isDisposed(); //判断时间是否中断的
}

关于Scheduler的线程操作

默认情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

        observable.subscribeOn(Schedulers.io()); // 指定onSubscribe方法执行的线程为 IO线程
        observable.observeOn(AndroidSchedulers.mainThread());// 指定observer回调方法执行的线程为 android 主线程

观察者的源码分析

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

以上代码可以看出观察者实际上就是一个接口,接口的使用在上面已结介绍过了

接下来是被观察者源码分析

我们从被观察者的create()方法看起

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判断是不是空的source对象
 //传入Observable和Function对象,返回ObservableMap对象(hook功能默认没有开启)
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

requireNonNull方法里就是判断source是否为空,如果是空的就会抛出一个空指针异常,这个可以忽略,我们发现,ObservableMap的subscribeActual直接调用了source的subscribe函数,现在由两个问题,第一是source是什么?答案就是我们在上一步(这里是第一步)传入的ObservableCreate对象,就是说这里调用了ObservableCreate的subscribe函数。第二参数是什么呢,参数是MapObserver,它把我们原来的Observer对象t包装成MapObserver对象,我们现在去看看ObservableCreate的subscribe,发现它并没有实现,而是复用了父类Observable的subscribe,就是我们上面看到的那一段代码,所以直接看ObservableCreate的subscribeActual函数即可:


public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {//这个new ObservableCreate<T>(source)
        this.source = source;
    }
@Override
protected void subscribeActual(Observer<? super T> observer) {
    //创建了一个发射器对象
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //回调Observer实例的onSubscribe函数。这里是MapObserver
    observer.onSubscribe(parent);

    try {
        //回调订阅,此处为ObservableCreate订阅ObservableOnSubscribe
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

...
  }

这个对象的构造函数中,就是传入了source这个参数

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

    static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

看到这里,感觉像是在onAssembly里直接把source变量直接给返回了
下面是我看别人写的帖子中写到的,应该是没有什么代码上的处理逻辑了
这是一个hook实现,关于hook,可以理解为这是一个抽象代理,这个代理默认情况下不会对咱们的形参Observable做任何的处理,但是如果开发者想要对Observable做处理,可以调用RxJavaPlugins的SetonObservableAssembly()设置开发者自己实现的代理,从而替换原Observable,最后真正返回的是Observable的实现类ObservableCreate类的实例对象。在这里咱们没做任何处理,所以返回默认的Observable实现类ObservableCreate。至此创建完了一个被观察者对象Observable。

subscribe方法

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

ObjectHelper.requireNonNull(observer, "observer is null")与observer = RxJavaPlugins.onSubscribe(this, observer)这两个方法与上面的被观察者中的方法一样就不在介绍
接下来我们看subscribeActual(observer);这个方法

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);//建立订阅关系,此时在观察者中会介绍到onSubscribe的消息

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
...

我们可以看到代码中创建了一个类似发射器的东西CreateEmitter,CreateEmitter是在ObservableCreate类中的一个内部静态类,他实现了ObservableEmitter的方法
source.subscribe(parent)中source就是在创建被观察者对象时传入的ObservableOnSubscribe对象实例,调用其subscribe方法,将上游事件发送对象(ObservableOnSubscribe)和下游接收对象(Observer)关联起来。
大致流程是这样的:
(1)通过Observable.create创建了一个ObservableCreate对象,这个对象保存了我们实现的匿名实现类ObservableOnSubscribe。
(2)通过map,创建了一个ObservableMap对象,这个对象保存了(1)中的ObservableCreate对象。
(3)通过subscribe,实现了ObservableMap对Observer的订阅(不要奇怪是不是反了,这样是为了连续性)。
(4)在ObservableMap实现了对Observer的订阅时,内部会调用保存的ObservableCreate对象对MapObserver对象进行订阅(构造MapObserver对象,会将Observer保存在MapObserver的成员变量actual中)
(5)在ObservableCreate对象实现了对MapObserver的订阅时,ObservableCreate保存的ObservableOnSubscribe对象会对CreateEmitter对象进行了订阅(构造CreateEmitter对象时,保存了MapObserver)

RXjava线程调度

下面是我从别的地方盗过来的一张图,忘了是从哪找的了,完美解释线程的切换


2814211-cde17013310e2674.png

我们来看切换线程用的方法subscribeOn
observable.subscribeOn(AndroidSchedulers.mainThread())

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

又是熟悉的两个方法,我们忽略,看new ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
...

这里是不是也是很熟悉,根据上面的subscribe方法的经验,最后肯定也会执行到subscribeActual方法,所以我们直接看subscribeActual方法
我们来看parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));这句

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
...
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
...

SubscribeTask 是ObservableSubscribeOn的内部类 里面的run方法,执行了source.subscribe(parent)订阅操作,source为ObservableSubscribeOn执行构造方法是传入
接着我们来看scheduleDirect方法

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

  接着查看scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);方法
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

createWorker()是一个抽象方法
于是搜索creatteWorker方法结果如下


搜索createWorker.png

可以看到里面有IoScheduler 有newThreadScheduler有ImmediateHginScheduler等类,这些在Scheduler类中都有似曾相识的感觉,于是我猜测在public final Observable<T> subscribeOn(Scheduler scheduler) 方法中
scheduler变量,比如传入的是Scheduler.IO就会调用IOScheduler的createWorker方法

public final class IoScheduler extends Scheduler {
...
final AtomicReference<CachedWorkerPool> pool;
...
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
...
 static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
...
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
}

这里就是核心所在啦,步骤如下:
(1)创建了一个Worker对象。
(2)没有hook,返回原Runnable对象,就是我们上面的SubscribeTask对象。
(3)将Worker对象和Runnable对象封装到DisposeTask中。
(4)调用worker对象的schedule函数。
重点就在于threadWorker.scheduleActual,threadWorker通过CachedWorkerPool的get函数获取:
···

ThreadWorker get() {
     if (allWorkers.isDisposed()) {
         return SHUTDOWN_THREAD_WORKER;
     }

    //当缓存不为空时,优先从缓存中获取
     while (!expiringWorkerQueue.isEmpty()) {
         ThreadWorker threadWorker = expiringWorkerQueue.poll();
         if (threadWorker != null) {
             return threadWorker;
         }
     }

     // 没有缓存,重新构建
     ThreadWorker w = new ThreadWorker(threadFactory);
     allWorkers.add(w);
     return w;
}

所以这里调用的是ThreadWorker的scheduleActual,但是它本身并没有实现而是其父类NewThreadWorker实现的

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    //没有hook,返回原Runnable对象
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //封装Runnable对象和订阅容器对象
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    ...

    Future<?> f;
    try {
        //重点
        if (delayTime <= 0) {
            //没有延时
            f = executor.submit((Callable<Object>)sr);
        } else {
            //延时
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

我们看看executor是啥,原来是线程池的实现类啊,现在有了线程池实例,有了Runnable,不用问,肯定是将Runnable添加到线程池的工作队列中执行,即会调用Runnable的run方法。回到上面,去掉层层包装,我们看下最初始的那个Runnable:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        //调用source的订阅方法,此处source为上游的ObservableCreate对象
        source.subscribe(parent);
    }
}

到这里一个subscribeOn的流程已经完成了。

接下来我们看下observeOn(Scheduler scheduler)方法

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

看着与之前的套路一样,我们接着看ObservableObserveOn类


public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //不会走这里
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
           //创建Worker
            Scheduler.Worker w = scheduler.createWorker();
           //注释1
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
...
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
...
        @Override
        public void onNext(T t) {
            if (done) {//控制不会二次执行
                return;
            }
 //默认是0,不等于 QueueDisposable.ASYNC,所以将发射的T对象入队
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        @Override
        public void onError(Throwable t) {
            if (done) {//控制不会二次执行
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {//控制不会二次执行
                return;
            }
            done = true;
            schedule();
        }
...
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
...
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
...

在上面代码中,在onNext,onError,onComplete方法中都调用了schedule()方法,在这些方法中都没有做什么特殊处理。里面习性worker.schedule(this)。代码与上面的subscribeOn中类似,同样都是通过Work进行调度与切换。
我们这里传入的是AndroidSchedulers.mainThread(),它的本质就是一个HandlerScheduler,我们就看HandleerScheduler中的schedule方法

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

看着这里明白了,原来是通过handle发送到android主线程。
接下来我们继续回到ObservableObserveOn类中继续看他的run方法

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        //走这里
        drainNormal();
    }
}

我们发现drainNormal就是从队列中取出参数T,然后做了一些检查,最后调用其onNext,而此时,因为经由Hanlder回调在主线程了。不过你可能会感到奇怪,这里并没有onError和onComplete的逻辑啊,不要急,我们回想一下刚才ObserveOnObserver的onComplete方法,它里面主要做了两个操作:
(1)将done置为true
(2)同样回调schedule
接下里我们看这里

if (checkTerminated(done, q.isEmpty(), a)) {
    return;
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            //d为done,即true
            if (d) {
                Throwable e = error;
                //delayError为false
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    //此处我们回调的是onComplete,所以e为null
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        //因为此时onNext任务队列为空,所以走到这
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

到现在已经很明朗了,执行完全部的onNext,在回调onComplete时,返回true,所以drainNormal后面相关代码就不再执行,因为已经return了。onError也是同样的道理。同时,我们在上面代码中还发现一个问题,那就是我一回调完onComplete就把worker给dispose了,所以后面如果我们继续调用onError就不会继续执行了,因为已经停止订阅。
到这里我们就可以总结一下subscribeOn和observerOn的使用:
(1)subscribeOn只对上游有效,因为是在订阅过程中传递的,如果有多个,那么只有第一个”生效”(其实对于传递订阅关系都生效了,只是最终事件发射只体现出了最上游subscribeOn的作用)
(2)observerOn只对下游有效,因为它是在事件发射出来之后,回调事件的过程中生效的

参考自

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容