Rxjava2-线程切换解析

ObservableOn()

直接查看实现,会发现onSubscribe()中做了一些判断,比如82 104等几行都是做了一些同步 异步 等的判断,然后初始化DisposableonSubscribe()是上游Observable完成了整条订阅链之后调用的,所以这些操作是在开始订阅之后才初始化操作,然后106行可以看出把一个包装处理过的Disposable传递给下游
image.png

和之前的一样,subscribeActual方法里会将observer进行包装,然后传递给source也就是上游进行订阅
* `40`行进行了判断所传进来的`scheduler`是否跟原本的线程一致,如果是一样的就直接传递不用进行处理
*  `43`行创建了一个对应`scheduler`的`worker`,`worker`在后续负责把数据在对应的线程进行发射操作
image.png
发射数据onNext处理
        @Override
        public void onNext(T t) {
          ...
          // 前面的都先忽略掉,会发现最后会调用这个方法
            schedule();
        }
        
      void schedule() {
         if (getAndIncrement() == 0) {
         // 在这个可以看到,上面根据schedule的worker执行了schedule(),并且把自身传进去,this其实实现了runnable,所以可以理解为传了一个runnable进去
            worker.schedule(this);
        }
    }
接着上面的以AndroidSchedulers.mainThread()这个scheduler为例,这里实际上是将主线程的looper传进去了
image.png

查看一下这个scheduler的worker,会发现worker的基类schedule()方法是相同的互相调用的,所以可以直接看多个参数的schedule(),可以看到73行创建了一个ScheduledRunnable对象,并且把主线程的handler以及外面的Observer传递过去,接着82行用主线程的handler发送消息,119ScheduledRunnable里的run被调用,接着Observer也就是runnable也调用run方法

image.png
image.png
到这里可以看出,实际上当切换线程的时候,observer(也实现了Runnable)的onNext往scheduler里发送自身,让scheduler来决定自身应该在什么线程执行run方法,接下来看回observer的run方法,就是判断了一下要执行哪个方法
image.png
可以看到最后是调用了onNext方法,到这里就完成了指定线程发射数据的功能
        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

值得注意的是可以看到的是v也就是我们要发射的数据,是通过poll方法获取的,查看代码可以发现

queue实际上就是一个Disposable也就是说是上游Observable,通过上游的poll方法去获取要onNext的数据

image.png
查看Observable其中一个实现ObservableMap的poll方法,可以看到这里实际上也是调用上游的poll方法,并且对数据的格式也就是不允许为null做了一层判断
        public U poll() throws Exception {
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
poll方法操作的对象实际上是下图104行的时候new出来的,具体查看其实就是缓存数据,类似一个容量池的作用

[图片上传失败...(image-a0f00c-1551301878533)]

image.png
用一段伪代码来展示切换线程之后的observer,其实相当于onNext等方法都被放在指定的线程里去发射数据
public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容