RxJava 2.x 源码分析(三)之一步一步了解切换线程原理

前文

Android 开发离不开异步操作,比如网络请求必须在子线程中进行,获取数据后可能需要更改UI,但是UI只能在主线程中被修改,所以我们经常在子线程和主线程中切换,代码就会比较繁琐不方便阅读。RxJava 的诞生是为了更好的处理异步操作,前面两篇文章我们了解到 RxJava 使用了观察者模式,现在继续了解 RxJava 利用观察者模式同时实现了线程切换。

线程调度

RxJava 中线程调度的核心方法是subscribeOn()observeOn()

  • subscribeOn 控制被观察者发送事件所在的线程。
  • observeOn 观察者接收事件时的线程。

在查看源码之前可以先从下面几个例子中看效果

下面的例子都使用的是相同的被观察者观察者:

  • 不切线程
  • 切换线程,被观察者发射事件操作切换到IO线程中执行,观察者接收事件切换到应用主线程中执行
  • 切换线程,与例2相同,增加调用 FlatMap 操作符

创建被观察者和观察者

// 创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        Log.d("RXJAVA_LOG", "发射事件1 -- " + Thread.currentThread().getName());
        e.onNext("item 1");
        Log.d("RXJAVA_LOG", "发射事件2 -- " + Thread.currentThread().getName());
        e.onNext("item 2");
        Log.d("RXJAVA_LOG", "发射事件3 -- " + Thread.currentThread().getName());
        e.onNext("item 3");
        Log.d("RXJAVA_LOG", "发射事件4 -- " + Thread.currentThread().getName());
        e.onNext("item 4");
    }
});

// 创建观察者
Observer<String> observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
        Log.d("RXJAVA_LOG", "接受事件 : " + s + " -- " + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }
};

例子1:不切线程

observable.subscribe(observer);

输出:

发射 item1     -- main
接受 item1 -- main
发射 item2     -- main
接受 item2 -- main
发射 item3     -- main
接受 item3 -- main
发射 item4     -- main
接受 item4 -- main

例子2:切换线程

 observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

输出:

发射 item1     -- RxCachedThreadScheduler-1
发射 item2     -- RxCachedThreadScheduler-1
发射 item3     -- RxCachedThreadScheduler-1
发射 item4     -- RxCachedThreadScheduler-1
接受 item1 -- main
接受 item2 -- main
接受 item3 -- main
接受 item4 -- main

例子3:使用FlatMap并切换线程

observable
    .flatMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {
            final String str = s;
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    Log.d("RXJAVA_LOG", "发射 " + str + "-sub -- " + Thread.currentThread().getName());
                    e.onNext(str + "-sub");
                }
            });
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

输出:

发射 item1     -- RxCachedThreadScheduler-1
发射 item1-sub -- RxCachedThreadScheduler-1
发射 item2     -- RxCachedThreadScheduler-1
发射 item2-sub -- RxCachedThreadScheduler-1
发射 item3     -- RxCachedThreadScheduler-1
发射 item3-sub -- RxCachedThreadScheduler-1
接受 item1-sub -- main
发射 item4     -- RxCachedThreadScheduler-1
发射 item4-sub -- RxCachedThreadScheduler-1
接受 item2-sub -- main
接受 item3-sub -- main
接受 item4-sub -- main

从上面的例子可以看出:

  • subscribeOn()可以切换被观察者和使用FlatMap或者其他操作符的子被观察者发射的事件的线程。
  • observeOn()可以控制观察者接收事件所在的线程。

那么我们就可以把网络请求操作作为被观察者的事件,并且把发送事件线程切换到IO线程,然后把更新UI操作放到观察者接收事件的回调中,并把接收事件所在线程切换回主线程,这样就可以利用 RxJava 完成我们网络请求的异步操作。


以例子2为阅读源码的例子,根据subscribeOn()、observeOn()、subscribe()的调用顺序分为三步查看

1. subscribeOn()

下面我们开始阅读源码,先从subscribeOn(Schedulers.io())开始看,首先看看io()方法

// Schedulers.java
/**
 * Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
    ...
 */
@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

这里出现了一个变量IO,看看是什么东西

// Schedulers.java
public final class Schedulers {
    @NonNull
    static final Scheduler IO;
    
    static {    
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ...
    }
    ...
}

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}

原来变量IO是一个Scheduler对象且是Schedulers的静态成员变量且在类加载的时候已经初始化了,PS:Schedulers可以看成是一个用来创建Scheduler的工厂,提供多个静态方法如io()newThread()computation等来获取适用于不同场景的Scheduler

接着看初始化过程,首先创建了一个IOTask实例,然后传给RxJavaPlugins的初始化方法initIoScheduler()

// RxJavaPlugins.java
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
    // 检查 IOTask 是否为空
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
    if (f == null) { // onInitIoHandler 没有被设置,所以为 null
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler);
}

@NonNull
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
    try {
        // s : IOTask
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}

首先检查IOTask对象是否为空,然后调用callRequireNonNull方法调用IOTask.call(),并返回call()的返回值,即返回IoHolder.DEFAULT,IoHolder又是什么呢,原来是Schedulers的内部类,变量IO的本质原来是IoScheduler对象

// Schedulers.java
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

变量IO的身份我们已经知道了,回到一开始的io()看看RxJavaPlugins.onIoScheduler(IO)做了什么

// RxJavaPlugins.java
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
    if (f == null) { // f = null
        return defaultScheduler;
    }
    return apply(f, defaultScheduler);
}

由于onIoHandler是静态变量且没有初始化,所以为null,直接返回我们的变量IO

所以io()方法最后返回的是IoScheduler对象。

接着看subscribeOn()

// Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
    // 检查是否为 null
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

返回的是ObservableSubscribeOn对象,且保存了上游的被观察者Scheduler对象

// ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); // source: 上游的被观察者
        this.scheduler = scheduler; // 本例子是 IoScheduler
    }
    ...
}

到此,subscribeOn()的方法已经看完了,最终返回一个ObservableSubscribeOn对象。


2. observeOn()

首先看看AndroidSchedulers.mainThread()怎么获取主线程操作的 scheduler

// AndroidSchedulers.java
public final class AndroidSchedulers {
    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
        
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
    });
    ...
}

这里出现了一个静态变量MAIN_THREAD,且由RxAndroidPlugins初始化,代码如下

// RxAndroidPlugins.java
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
    if (scheduler == null) { // false
        throw new NullPointerException("scheduler == null");
    }
    Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
    if (f == null) { // onInitMainThreadHandler 为 null
        return callRequireNonNull(scheduler);
    }
    return applyRequireNonNull(f, scheduler);
}

static Scheduler callRequireNonNull(Callable<Scheduler> s) {
    try {
        // s : 传入 initMainThreadScheduler 的匿名对象
        Scheduler scheduler = s.call();
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    } catch (Throwable ex) {
        throw Exceptions.propagate(ex);
    }
}
  • initMainThreadScheduler()中最终执行callRequireNonNull(scheduler)
  • 把之前传入initMainThreadScheduler()的匿名对象作为 scheduler 传进callRequireNonNull
  • 然后返回scheduler.call()的返回值
  • 根据上面代码返回的是MainHolder.DEFAULTMainHolderAndroidSchedulers的静态类,DEFAULT创建代码如下
// AndroidSchedulers.java
private static final class MainHolder {
    // 通过 Looper.getMainLooper() 获取主线程的 Looper
    // 创建在主线程 Looper 上的 Handler
    // 用这个 Handler 创建 HandlerScheduler 对象
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}

DEFAULT其实就是一个拥有主线程Handler的HandlerScheduler对象,所以AndroidSchedulers.mainThread()最终返回的是HandlerScheduler对象。

下面回归到observeOn()

// Observable.java
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // this : ObservableSubscribeOn
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

从上面代码可以看见observeOn最终返回的是ObservableObserveOn对象,且在我们的例子中这个对象拥有的 Scheduler 是HandlerScheduler


3. subscribe()

到这里我们知道本例子中被观察者ObservableObserveOn

这个方法从之前两篇文章中知道它是一个启动方法,一旦调用,被观察者就开始发送事件给观察者,而且实际执行的方法是被观察者subscribeActual(),还记得在我们的例子中最外层的被观察者ObservableObserveOn所以我们从它的subscribeActual()中开始看起

// ObservableObserveOn.java
@Override
    protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) { // false
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        // source : ObservableSubscribeOn
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

在第二步我们知道ObservableObserveOn中的 scheduler 是HandlerScheduler,所以走 else ,首先执行了Scheduler.Worker w = scheduler.createWorker();,创建了HandlerScheduler.HandlerWorker对象

// HandlerScheduler.java
@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;
    
    private volatile boolean disposed;
    
    HandlerWorker(Handler handler) {
        this.handler = handler;
    }
    ...
}

在这里先不深入了解HandlerWorker,但是我们可以猜到worker是切换线程的关键。
接着看

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

这里的source是什么?还记得第一步调用subscribeOn()返回的ObservableSubscribeOn对象吗,第二步相当于调用了ObservableSubscribeOn.observeOn()到最后创建ObservableObserveOn对象的时候ObservableSubscribeOn赋值给了source

由于ObservableSubscribeOn没有覆写subscribe()方法,所以source.subscribe()调用的是Observable.subscribe()

// Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // observer : ObserveOnObserver
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ...
    }
}

最终会调用ObservableSubscribeOnsubscribeActual()

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    // s : ObserveOnObserver
    // 创建 SubscribeOnObserver 包装了 s
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    // 调用我们的回调方法 onSubscribe
    s.onSubscribe(parent);
    // scheduler : IoScheduler
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> actual;

    final AtomicReference<Disposable> s;

    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable s) {
        DisposableHelper.setOnce(this.s, s);
    }
    
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
        
    // 省略以下方法
    // onNext onError onComplete dispose isDisposed
    ...
}

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        // source : 我们创建的被观察者
        // parent : SubscribeOnObserver
        source.subscribe(parent);
    }
}

subscribeActual()只有三行代码,首先创建SubscribeOnObserver包装我们创建的观察者,然后调用我们的观察者回调方法onSubscribe(),而最后一行代码

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

是最重要的,这行代码的执行引起了一连串方法调用,激活了被观察者开始发射事件以及后面的一连串的操作,之后我们会在多个类里不停的跳转,见证着一系列动作。

这行代码分三步,但只需要关心前两步,第三步parent.setDisposable是设置Disposable,提供可断开被观察者观察者连接的操作:

1. new SubscribeTask()

创建SubscribeTask,SubscribeTask 继承了 Runnable ,重点看下 run 方法里面是

source.subscribe(parent);

根据前面两篇文章我们知道这行代码会调用Observable.subscribeActual(observer),然后调用ObservableCreate. subscribeActual()

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

最后调用我们创建被观察者时覆写的 subscribe 方法,从而激活了被观察者发射事件

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        ...
});

所以我们可以预知随着代码的深入,SubscribeTask.run()将会在子线程中被调用,从而实现了被观察者在子线程中发射事件。

2. IoScheduler.scheduleDirect()

实际上调用的是IoScheduler父类SchedulerscheduleDirect()

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // w : EventLoopWorker
    final Worker w = createWorker();
    // run : SubscribeTask
    // decoratedRun : SubscribeTask
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        
    DisposeTask task = new DisposeTask(decoratedRun, w);
        
    w.schedule(task, delay, unit);
        
    return task;
}
// IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
  • 先创建 worker
  • 把 SubscribeTask 包装后与 worker 组成 DisposeTask
  • 然后执行
w.schedule(task, delay, unit);

时执行的是 EventLoopWorker.schedule(),看看EventLoopWorker

// IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

    @Override
    public void dispose() {
        if (once.compareAndSet(false, true)) {
            tasks.dispose();

            // releasing the pool should be the last action
            pool.release(threadWorker);
        }
    }

    @Override
    public boolean isDisposed() {
        return once.get();
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) { // false
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}

注意到构造方法里成员threadWorker是从CachedWorkerPool缓存池中获取的,EventLoopWorker.schedule()最终会调用NewThreadWorker.scheduleActual

// NewThreadWorker.java
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {  
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {  // delayTime = 0
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

scheduleActual()内不做深入了解,只需要知道会执行下面这行代码

f = executor.submit((Callable<Object>)sr);

executor.submit()内部也不做深入介绍,只需要知道最终会在子线程调用SubscribeTask.run()证实了上文的猜测即可。

到了这里已经到达了我们的被观察者中发射事件的操作了,假设从网络请求中返回了数据可以通过onNext方法发射给观察者,那么接下来就看看在子线程中发射事件之后观察者是如何在主线程中接收事件并作出处理操作的。

// ObservableCreate.java
@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

到这里我们会有疑问这个观察者是哪个?刚刚证实了被观察者发射事件的过程是放在SubscribeTask,现在回顾一下创建SubscribeTask时的代码

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    // s : ObserveOnObserver
    // 创建 SubscribeOnObserver 包装了s
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    // 调用我们的回调方法 onSubscribe
    s.onSubscribe(parent);
    // scheduler : IoScheduler
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

看完之后我们明白了原来顶层的观察者SubscribeOnObserver,先我们看看它的构造方法,用变量actual保存了ObserveOnObserver对象

// ObservableSubscribeOn.SubscribeOnObserver
SubscribeOnObserver(Observer<? super T> actual) {
    this.actual = actual;
    this.s = new AtomicReference<Disposable>();
}

看看SubscribeOnObserveronNext()

@Override
public void onNext(T t) {
    actual.onNext(t);
}

里面调用了ObserveOnObserver.onNext()

// ObservableObserveOn
@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        // worker : HandlerWorker
        // this : ObserveOnObserver
        worker.schedule(this);
    }
}

ObserveOnObserver内有一个队列 queue 用来保存收到的数据,然后调用schedule(),根据之前所说的观察者的worker是HandlerWorker,前面贴的代码并没有放schedule()这个方法,下面重新贴出来:

先调用父类Scheduler.workerschedule()

// Scheduler.worker
@NonNull
public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}

再调用HandlerWorkerschedule()

// HandlerScheduler.HandlerWorker
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");
    
    if (disposed) {
        return Disposables.disposed();
    }
    
    run = RxJavaPlugins.onSchedule(run);
    
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    
    return scheduled;
}

其主要操作就是把ObserveOnObserver包装成ScheduledRunnable放进Message,然后通过HandlerWorker绑定的Handler(即主线程Handler)发送到消息队列等待处理。

这里有个疑问就是schedule()接收的是 Runnable 类型的参数,为什么我们可以把ObserveOnObserver传进去呢?之前没有贴完整的ObserveOnObserver代码,现在看看到底是什么东西

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
    ...
    @Override
    public void run() {
        if (outputFused) { // false
            drainFused();
        } else {
            drainNormal();
        }
    }
        
    void drainNormal() {
        int missed = 1;
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
    
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
    
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
    
                if (checkTerminated(d, empty, a)) {
                    return;
                }
    
                if (empty) {
                    break;
                }
    
                a.onNext(v);
            }
    
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    ...
}

原来ObserveOnObserver也实现了Runnable,且run()里会调用drainNormal,就是把上面用来保存事件的队列 queue 都发送到观察者


到这里我们已经把例子2切换线程的过程看了一遍,之前只是知道简单的调用两个方法就能随意切换线程,现在终于通过例子了解到里面的原理了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352