RxJava 2.0----你不知道的Schedulers

一.引言

RxJava中 使用observeOn(Schedulers s)和subscribeOn(Schedulers s)是任务调度的操作符。subscribeOn(Schedulers s) 指示Observable将任务(数据的发射以及数据的处理)放在指定的调度器上的执行 ,observeOn(Schedulers s)指示一个Observer在一个指定的调度器上调用onNext, onError和onCompleted等方法。即subscribeOn决定任务的发射的线程,observeOn决定任务的接收线程。

二.Schedulers任务调度

先看看下面的例子,先猜猜任务调度执行的线程:

//-------------------------- 1   默认主线程-----------------
tx_console.setText("");
printThread("1 默认主线程 ");

Observable.create(observableOnSubscribe)
        .flatMap(function)
        .subscribe(consumer);

//---------------------------2   指定 Observable 的调度器---------------
printThread("2  指定 Observable 的调度器");
Observable.create(observableOnSubscribe)
        .flatMap(function)
        .subscribeOn(Schedulers.newThread())
        .subscribe(consumer);

//---------------------------3    默认新线程---------------
new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        printThread("  默认新线程");
        Observable.create(observableOnSubscribe).flatMap(function).subscribe(consumer);
    }
}).start();

//---------------------------    新线程 指定  Observable ,Observer的调度器---------------

new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        printThread("    新线程 指定  Observable ,Observer的调度器");
        Observable.create(observableOnSubscribe).flatMap(function)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);
    }
}).start();
Observable.timer(4,TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                printThread("    timer 调度器");
            }
        });


结果输出:

结果输出

从上面的输出结果中,我们大概知道了下面几点:

  1. RxJava中不同的调度器可以指定在不同的线程中执行 。
  2. Create创建的Observable默认在当前线程中执行任务流,并在当前线程观察
  3. 没有调用observeOn指定观察者调度器,观察者默认在Observable发射线程里执行
  4. timer创建的Observable会在一个叫Computation的线程中执行任务流
  5. 除了observeOn和subscribeOn ,使用其他创建或者变换操作符也有可能造成线程的切换

三. subscribeOn()原理

subscribeOn()用来指定Observable在哪个线程中执行事件流, 通过源码分析subscribeOn可以知道是Observable怎样实现线程的切换的。

1.subscribeOn方法,创建一个 ObservableSubscribeOn对象

 public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

2.ObservableSubscribeOn实现subscribeActual()方法, 保证当前Observer的Disposable 只设置一次,后来的设置无效,并且在指定的调度器重新订阅

   @Override
public void subscribeActual(final Observer<? super T> s) {
//  parent表示目标观察者
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
        //静态对象里的常量,保证初始化一次
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
          //保证当前Observer的Disposable 只设置一次,后来的设置无效
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
       //保证当前Observer的Disposable 只设置一次,后来的设置无效
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
           //指定的调度器重新订阅
            source.subscribe(parent);
        }
    }

3. 调度器的scheduleDirect()指定调度器在自己的线程池(Worker)执行任务

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

4.IoScheduler ---IO调度器的createWorker()

  class IoScheduler extends Scheduler {
final AtomicReference<CachedWorkerPool> pool;
static {
    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    NONE.shutdown();
}

@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
}
 static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

四. observeOn()原理

observeOn()指示一个观察者 observer 在指定的调度器上调用onNext, onError和onCompleted等方法。先来看看源码的实现过程:
1.observeOn()方法,创建 ObservableObserveOn对象

   @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

2.ObservableObserveOn实现方法subscribeActual,如果是TrampolineScheduler, TrampolineScheduler调度器表示在当前事件流的线程执行任务 ,否则在指定调度器的线程执行任务

  @Override
  protected void subscribeActual(Observer<? super T> observer) {
      if (scheduler instanceof TrampolineScheduler) {
         // TrampolineScheduler调度器表示在当前事件流的线程执行任务 
          source.subscribe(observer);
      } else {
          Scheduler.Worker w = scheduler.createWorker();
          source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
      }
  }

3.ObserveOnObserver在调度器工作线城池中执行onNext(),onError(),onComple()

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
   implements Observer<T>, Runnable {
    //目标观察者
     final Observer<? super T> actual;
    //调度器工作线城池
      final Scheduler.Worker worker;
     //数据缓冲队列
     SimpleQueue<T> queue;

       @Override
       public void onSubscribe(Disposable s) {
           if (DisposableHelper.validate(this.s, s)) {
               this.s = s;
               ......
               queue = new SpscLinkedArrayQueue<T>(bufferSize);
               actual.onSubscribe(this);
           }
       }

       @Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode != QueueDisposable.ASYNC) {
               queue.offer(t);//数据缓存入队列
           }
           schedule();
       }

       @Override
       public void onError(Throwable t) {
           if (done) {
               RxJavaPlugins.onError(t);
               return;
           }
           error = t;
           done = true;
           schedule();
       }

       @Override
       public void onComplete() {
           if (done) {
               return;
           }
           done = true;
           schedule();
       }

       void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);//调用当前Run()方法
           }
       }
      //
       void drainNormal() {
           int missed = 1;

           final SimpleQueue<T> q = queue;
           final Observer<? super T> a = actual;

           for (;;) {
               if (checkTerminated(done, q.isEmpty(), a)) {
                   return;
               }

               for (;;) {
                   boolean d = done;
                   T v;
                   try {
                       v = q.poll(); //冲缓冲队列取数据
                   } catch (Throwable ex) {
                       Exceptions.throwIfFatal(ex);
                       s.dispose();
                       q.clear();
                       a.onError(ex);
                       worker.dispose();
                       return;
                   }
                   boolean empty = v == null;

                   if (checkTerminated(d, empty, a)) {
                       return;
                   }

                   if (empty) {
                       break;
                   }

                   a.onNext(v);//数据接收
               }
........
           }
       }

       void drainFused() {
              .......
               actual.onNext(null);
              ......
       }
       //
       @Override
       public void run() {
          //是否需要丢弃
           if (outputFused) {
               drainFused();
           } else {
               drainNormal();
           }
       }
}

五.实例演示

根据源码的分析,我们来看看下面的例子,就不难理解了。

 //---------------------------1   同时指定多个 Observable 的调度器---------------
        Observable.create(observableOnSubscribe)
                .flatMap(function)
                .subscribeOn(Schedulers.newThread())//Observable new 线程发射数据
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.computation())
                .subscribe(consumer);//最终observer在new 线程接收数据
        
        //---------------------------2   同时指定多个 Observable 的调度器---------------
        Observable.timer(2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        printThread("2   同时指定多个 Observable ,observer 的调度器");
                        Observable.create(observableOnSubscribe).flatMap(function)
                                .subscribeOn(Schedulers.newThread())
                                .subscribeOn(Schedulers.io())
                                .subscribeOn(Schedulers.computation())//最终 Observable在 new 线程发射数据
                                .observeOn(Schedulers.newThread())
                                .observeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(consumer);//最终observer 在 main 线程接收数据
                    }
                });
 //--------------------------- 3   切换 Observable ,Observer的调度器---------------
       create(observableOnSubscribe)
                .flatMap(function)//new 1 线程接收数据
                .subscribeOn(Schedulers.newThread())//new 1线程发送数据
                .flatMap(function)//new 1 线程接收数据
                .subscribeOn(Schedulers.io())//设置无效
                .observeOn(Schedulers.newThread())//切换new 2线程接收数据
                .flatMap(function)//new 2 线程接收数据
                .subscribeOn(Schedulers.computation())//设置无效
                .flatMap(function)//new 2 线程接收数据
                .observeOn(Schedulers.io())//切换io线程接收数据
                .subscribe(consumer);//目标Observer 在io线程中接受数据 
输出结果
//---------------------------4   调用Observable 的操作符
    create(observableOnSubscribe)//io线程发送数据
                .flatMap(function)//io线程接收数据
                .subscribeOn(Schedulers.io())//指定io线程发送数据
                .flatMap(function)//io线程接收数据
                .observeOn(AndroidSchedulers.mainThread())//切换mian线程接收数据
                .flatMap(function)//mian线程接收数据
                .delay(2, TimeUnit.SECONDS)//delay操作符在Computation线程中接受数据
                .subscribe(consumer);//目标Observer 在Computation线程中接受数据 
输出结果

嗖嘎

六.调度器的种类

RxJava中可用的调度器有下面几种:

Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.single() 该调度器的线程池只能同时执行一个线程。
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline() 当其它排队的任务完成后,在当前线程排队开始执行。
AndroidSchedulers.mainThread( ) 主线程,UI线程,可以用于更新界面
 //---------------------------5  对比  切换 Observable trampoline调度器---------------
        printThread("    切换 Observable io调度器");
        Observable.just("1","2","3","4","5")
                .subscribeOn(Schedulers.newThread())
                .flatMap(function)
                .observeOn(Schedulers.io())
                .flatMap(function)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer);

输出结果:

输出结果
  Observable.timer(3,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        printThread("    切换 Observable trampoline调度器");
                        Observable.just("1","2","3","4","5")
                                .subscribeOn(Schedulers.newThread())
                                .flatMap(function)
                                .observeOn(Schedulers.trampoline())
                                .flatMap(function)
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(consumer);
                    }
                });

输出结果:

输出结果

六.各种操作符的默认调度器

总结了一些操作符默认的调度器:

buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan,timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retrytrampolinesample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) trampoline
skipLast(long time, TimeUnit unit, boolean delayError) trampoline
sample(long period, TimeUnit unit) computation
sample(long period, TimeUnit unit, boolean emitLast) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) trampoline
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval computation
timeout(timeoutSelector) computation
timeout(firstTimeoutSelector, timeoutSelector) computation
timeout(timeoutSelector, other) computation
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) computation
timeout(timeout, timeUnit, other) computation
timer computation
timestamp computation
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation

最后,小伙伴们,有么有觉得是干货,是的话,就为我点个赞吧!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容