RxJava基础十一-线程调度

此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著

四、使用Scheduler进行线程调度

4.1 什么是Scheduler

RxJava 是一种响应式编程框架,响应式编程就是围绕着异步数据进行的编程,而RxJava就是使用Scheduler来实现异步的。Observable默认是单线程的,而大部分的Observable默认工作在Subscriber调用subscribe方法进行订阅时所在的线程中。但对响应速度很高的UI线程,理想情况是耗时的Observable工作在一个后台线程上,当工作完成后在UI线程上通知Subscriber来更新UI。 RxJava使用subscribeOn和observeOn将Scheduler应用到Observable和Subscriber上,并提供多种不同的Scheduler,分别是io,immediate,trampoline,computation等。此外RxJava还提供了一个from方法,可以根据传入的Executor对象来创建Scheduler。

多次调用了observeOn和subscribeOn,到底Observable和Subscriber会工作在哪个线程里呢?
-多次调用observeOn会影响从其调用位置开始的后面所有操作符合Subscriber。所以如果我们只想更改Subscriber的Scheduler, 应该在所有操作符的后面使用observeOn。
-多次调用subscribeOn时,只有最上面的那个起作用,所以subscribeOn只调用一次就行了,多了没有效果且造成困扰。

private void scheduler() {
        Observable
                .create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        System.out.println("start:" + Thread.currentThread().getName());
                        subscriber.onNext(1);
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        System.out.println(integer + ":" + Thread.currentThread().getName());
                        return integer + 1;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        System.out.println(integer + ":" + Thread.currentThread().getName());
                        return integer + 1;
                    }
                })
                .subscribeOn(Schedulers.computation())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("action: " + Thread.currentThread().getName());
                    }
                });
    }

运行结果如下,因为最上面的subscribeOn使用的是newThread类型的Scheduler,所以create和map都运行在这个Scheduler上;然后后面observeOn使用的是io类型的Scheduler,所以接下来的map和Subscriber都会运行在io的Scheduler上。注意后面还有subscribeOn使用computation类型的Scheduler,但是因为上面已经有了observeOn操作符,所以computation类型的Scheduler并没起作用。

start:RxNewThreadScheduler-1
1:RxNewThreadScheduler-1
2:RxIoScheduler-2
action: RxIoScheduler-2

我们将observeOn操作符挪到最后,结果是

start:RxNewThreadScheduler-1
1:RxNewThreadScheduler-1
2:RxNewThreadScheduler-1
action: RxComputationScheduler-1

4.2 Scheduler的类型

4.2.1 computation

computation Scheduler适用于和CPU有关的任务,但不适合那些会造成阻塞的任务,如访问磁盘和网络等。这是因为computation Scheduler内部会根据当前运行环境的CPU核心数来创建一个线程池,里面的每个线程会占用一个CPU的核心,从而可以充分利用CPU的计算资源。如果在computation Scheduler上进行阻塞的操作,当前的Scheduler在阻塞的时候还会占用CPU,从而造成资源的浪费。另外,由于CPU的核心数有限,所以computation Scheduler内的线程也是有限的,如果有超出CPU核心数的任务要进行,后来的任务就必须排队等待。所以在使用computation Scheduler时,我们最好也能保证同时进行的任务数量小于CPU的核心数,这样新建的任务会立刻申请到资源并且开始运行。当没有使用Scheduler的时候, 很多和时间相关的操作符,如delay、timer、skip、take、等,所创建的Observable默认就会运行在computation Scheduler上的

4.2.2 newThread

newThread Scheduler每次都会新建一个线程。一般情况下不是很推荐这个Scheduler,这是因为每次新建一个线程都会造成稍微的延迟,而且这个线程在任务结束的时候就会终结,所以也不能重用。newThread Scheduler适合那些工作时间长且总数少的任务,大多数情况下都可以使用io Scheduler来代替newThread Scheduler。

4.2.3 io

io Scheduler类似于newThread Scheduler,不同之处是io Scheduler的线程可以被回收利用。io Scheduler内部也会维持一个线程池,当使用io Scheduler来处理任务的时候, 会先从线程池中查找空闲的线程,如果有空闲线程就会在这个空闲线程上执行任务;否则就会创建一个新的线程来执行任务,并在任务执行完毕时将这个空闲的线程加入到线程池中。当然空闲的线程不会一直在那里等待,RxJava默认空闲线程的存活时间是60秒,空闲时间超过60秒的线程会被回收。

io Scheduler特别适合那些使用很少CPU资源的I/O操作。因为I/O操作一般都会花费比较长的时间来等待网络请求或读取磁盘的返回结果,所以使用一个较大的线程池会比较合适,这样新来的任务就不需要排队等待。io Scheduler所使用的线程池是不限大小的,所有如果有足够多任务同时使用io Scheduler就会导致内存不足(OOM)的错误

4.2.4 immediate

immediate Scheduler会在当前的线程上立即开始执行任务,这会将当前线程上正在进行的任务阻塞。如果用Outer代表当前线程上正在执行的任务,用Inner来代表使用immediate Scheduler的任务,他们的执行顺序会如下所示,很明显这是一种“插队”的策略。

Outer start
Inner start
Inner end
Outer end

4.2.5 trampoline

trampoline Scheduler同immediate Scheduler很像,都会在当前线程上执行任务。但是trampoline并不是立即开始执行任务的,而是等待当前线程上之前的任务都结束之后再开始执行。同样适用Outter 和Inner来分别代表当前线程上的任务和使用trampoline Scheduler的任务:

Outer start
Outer end
Inner start
Inner end 

4.2.6 from

RxJava内置的各种Scheduler可以满足绝大部分使用需求,但是不排除有一些特殊的需求无法被满足,这时候我们可以使用Schedulers.from(Executor executor) 工厂方法来跟进我们提供的Executor创建Scheduler。

首先创建一个类实现ThreadFactory接口,然后新建一个Executor对象,这是核心和最大线程池大小为2,将空闲线程的存活时间设置为2秒,使用LinkedBlockingQueue来作为任务排队序列,使用新建的ThreadFactory来创建新的线程,最后跟进我们创建的Executor对象获取一个Scheduler对象。有了这个Scheduler对象,我们就可以和其它的Scheduler一样使用它了。

private void from(){
        Executor executor = new ThreadPoolExecutor(
                2,
                2,
                2000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<Runnable>(1000),
                new SimpleThreadFactory()
        );
        Observable.interval(0,1, TimeUnit.SECONDS)
                .take(5)
                .observeOn(Schedulers.from(executor))
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println(aLong + "-" + Thread.currentThread().getName());
                    }
                });
    }
结果
0-Thread-1
应该是interval这个方法使用的错误了, 所以才会调用一个。待研究。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容