抱着陌生的态度再看Rxjava(二)

如果你已经有了Rxjava1的使用基础,你可以看一下这一篇的大体的提纲,了解就可以链接到三,说不定三都不用看。。
如果没有Rxjava1的基础,就 定心点,小脚并并拢,坐正了往下看

subscribeOn和observeOn

  • 初见

我们之前在尝试Observable或者是Flowable的subscribe方法时候,有没有在意IDE自动帮我们弹出的方法里有subscribeOn这个鬼,于是我又好奇的点开Observable的源码,搜了下observeOn,结果也有。

看了下注释,比较抽象。按照我们的chinglish直接从方法上来翻译是在什么地方订阅,在什么地方观察。

按照我们之前的上下游观点,上游的人抛东西下来,下游的人接住。那么很显然下游的人肯定是观察的人,这个毋庸置疑。那么相应的上游的人就是订阅者(实际上订阅者还是太费解,我们把它理解成抛东西的人)

那么这两个方法现在按照我们的理解就是在哪里抛东西,和在哪里观察。

OK,我们来看一下这两个方法的传参,都是Scheduler

点进去一看。。。。抽象类。。不能直接用了。。。跪了。。

完。。。。

  • 再探

    就这么结束了??开玩笑!!

    还记得在第一篇中,我们不知道什么东西跟Subscriber一起连用的时候我们怎么做来着!没错!!github,源码目录走起。


    schedulers目录

    我们在根目录下发现schedulers目录,打开,看到Schedulers.java文件,点开来一看

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;
    ...
}

给了我们五个静态类,虽然看了一下,它似乎又在类中维护了这五个Scheduler的单例,但是毕竟是protected的不能直接给哥们儿拿来用啊。

然后一不小心,接着往下看的时候发现了,公有的静态方法,并且返回的还是我们正好需要的Scheduler,我去:

    //处理io
    public static Scheduler io() 
    //处理复杂计算
    public static Scheduler computation() 
    //普通的单独线程
    public static Scheduler single() 
    //新起一个线程
    public static Scheduler newThread()
    //在当前线程中,但是会等到当前线程任务执行完毕之后再去执行
    public static Scheduler trampoline()

相应的注释,我也差不多备注在方法上。

于是我们对我们的代码做一定的改动,然后打上相应的log

flowable.subscribeOn(Schedulers.single()).observeOn(Schedulers.newThread()).subscribe(subscriber);
//当前线程名
Thread.currentThread().getName()

如果你不知道这段代码是获得当前线程的名字,那要么出门左转java线程基础,要么ctrl + W。。。。

我们发现答应出来的log并没有什么问题,和我们预料的一样

02-22 01:05:19.346 31814-31814/org.ding.testmulti E/subscriber: onSubscribe   thread   :    main
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext    :    s  hello flowable1
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext   thread   :    RxNewThreadScheduler-1
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext    :    s  hello flowable2
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext   thread   :    RxNewThreadScheduler-1
02-22 01:05:19.355 31814-4423/org.ding.testmulti E/flowable: thread   :    RxSingleScheduler-1
subscriber就是我们所说的下游的观察者,由于我们这里使用到的是flowable和subscriber这一对鸳鸯。需要在onSubscribe方法中去调用Subscription的request方法,就很显然的可以理解,在onSubscribe方法中实际上我们还没有开始建立真正的连接,直到request之后我们在onNext,onError,onComplete中才是真正在observerOn的线程中运行的,可以看到两个onNext都是newThread没错。
 subscribeOn就是上游的线程,定义在flowable的subscribe中的方法就是运行在我们定义的singleThread没错。
  • 细思
    • 主线程怎么没有
      主线程是我们使用最最频繁的线程了,所有的UI操作都要放在我们的主线程中去进行,那设想一下,如果我们需要在上游或者是下游做一些UI操作,当然如果我们没有刻意的去使用SubscribeOnObserverOn,而我们的subscribe方法又正好在主线程中调用,那没有问题,整个都是在主线程中跑的,要是我们使用了呢,怎么办呢,Schedulers里面并没有提供主线程这个东西啊,没有main这个东西啊

    我又去找github了,目录翻了一圈,也没有找到MainThread这个鬼,好了,这回真放弃,大家再见。。。。

    醒了醒神,再回过头想想,主线程这个东西,是否是安卓特地适配的呢,rxjava中怎么可能会出现android特有的东西呢。。。

    • 为我们的rxjava添上Android的模块

    说时迟,那时快。。。。(好老)
    我们立马前往ReativeX的github库,看到RxGo,RxKotlin(java8来了你颤抖么),再往下,找到了RxAndroid!!
    点进去,看目录!!

    rxandroid目录结构

    你没有看错,rxandroid里面只有这些。。。
    还正好出现了我们需要的AndroidSchedulers,赶紧吃饱辣条,点击去看看

/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

    还要我说废话么。。。
    去module setting里面搜搜看这个dependency,注意哦,rxandroid也有适配rxjava1和rxjava2的两种版本哦,我们使用的是rxjava2的版本
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    * ######上下游的On关系
    不知道细心的你有没有去尝试过一个问题,只用subscribeOn和只用observerOn会出现什么情况。
    通过打印日志,你会发现

          如果只定义了上游的线程,而没有定义下游的线程,那么下游的线程将跟随上游的线程;
          如果只定义了下游的线程,那么上游的线程将依然使用当前线程。
    
    实际上也很好理解,上游发生了海啸洪水必然会影响下游,导致下游也波澜。而下游起了波澜,上游该咋地还咋地。

    * ######Schedulers的几个方法具体区别
    我们在上面大体的把几个都解释了一下,但是具体的什么时候用什么呢,我们一一来看
      * compute
        还记的上面说的维护了静态五个Scheduler么,我们看到compute对应的Scheduler是`ComputationScheduler`,源码走起!
     ```
      /**
        * Create a scheduler with pool size equal to the available processor
        * count and using least-recent worker selection policy.
        */
    public ComputationScheduler() {
            this(THREAD_FACTORY);
    }
      ```
  注释够明显了么,翻译过来大致是说会起一个线程池,大小跟available processor的数量(就是CPU数量)相等,并使用最近的工作线程选择策略。
   也就是说,compute方法会用把方法放在一个大小等于CPU核数的线程池中执行。
     * single
      ```
public SingleScheduler() {
          this(SINGLE_THREAD_FACTORY);
    }
    public SingleScheduler(ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          executor.lazySet(createExecutor(threadFactory));
    }
    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
          return SchedulerPoolFactory.create(threadFactory);
    }
我们看到也是创建了一个线程池,我们去create方法继续看,
 public static ScheduledExecutorService create(ThreadFactory factory) {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            if (exec instanceof ScheduledThreadPoolExecutor) {
                ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
                POOLS.put(e, exec);
            }
            return exec;
    }
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
              super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

不知道在android面试实用版2中,大家有否记得,说道的几个线程池,这里用到到就是计划线程池,主要是用来在未来的某一时刻进行执行的线程池,我们看到传入的是1,也就是说single会让方法在核心线程为1的线程池中工作。

 * io
   ```

public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

   使用的一个叫`CachedWorkerPool`的内部类,

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }
     ScheduledThreadPoo线程池,每一个任务相隔60纳秒(好小的样子)。并且维护了一个任务队列

ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }
   可以不断的对任务队列进行处理,可以认为这个队列是无限长的,好了。。这边扯得有点远。。
   也就是说io会让方法执行在一个无数量上线的线程池

     * newThread
     newThreadScheduler的代码是最少的,我们直接可以看到
       ```
@NonNull
    @Override
    public Worker createWorker() {
            return new NewThreadWorker(threadFactory);
    }
 每次都会起一个新的线程,也就是说newthread会让方法都在新的线程中执行
  * trampoline
  如注释。就是在当前线程中执行,并且等当前线程执行完了之后再去执行。
  别问我怎么没有源码了,因为我不懂。

 那么综上所述,一般我们使用的最多的应该就是主线程和io了,如果你要使用其他的方法,那么具体任务具体分析了。

电梯

抱着陌生的态度再看Rxjava(一)
抱着陌生的态度再看Rxjava(三)
抱着陌生的态度再看Rxjava(四)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容

  • 怎么如此平静, 感觉像是走错了片场.为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emit...
    Young1657阅读 1,461评论 2 1
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,160评论 6 151
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,464评论 7 62
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 2015年,5月14号妈妈车祸去世。 6月14号,男朋友出轨和别的女人上床了。
    念也不去阅读 130评论 0 1