08.RxJava运作流程源码分析

RxJava线程切换非常方便,只要调用subscribeOn(Schedules.io())就可以使前边的操作运行于子线程,调用obsersableOn(AndroidSchedules.mainThread())就可以设置后边的代码运行于主线程,那么是如此神奇,他是如何实现的?

今天就以下边的代码为切入点深入源码看一下

Observable.just("我是网络图片url").map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                Log.i(TAG, "apply1 thread:"+Thread.currentThread().getName());
                Log.i(TAG, "apply1");
                s = s +" 加上一个时间戳后";
                return s;
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                Log.i(TAG, "apply2 thread:"+Thread.currentThread().getName());
                Log.i(TAG, "apply2");
                s = s +" 加上第二个参数后";
                return s;
            }
        }).subscribeOn(Schedulers.io()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe() {
                Log.i(TAG, "onSubscribe thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i(TAG, "onNext thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onNext:"+s+" 开启下载这个图片");

            }

            @Override
            public void onError(@NonNull Throwable throwable) {
                Log.i(TAG, "onError");
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete thread:"+Thread.currentThread().getName());
                Log.i(TAG, "onComplete:下载完成");
            }
        });

程序运行流程图如下


RxJava运行流程图.png

just方法

创建一个ObservableJust对象返回,并将just传入的参数保保存为value

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

@SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

map方法

同样道理,创建一个ObservableMap对象,由于map方法由上边的ObservableJust对象调用,所以构造方法中传入的this表示的就是ObservableJust对象,创建ObservableMap对象后,保存上一级产生的ObservableJust为当前ObservableMap对象中的成员变量source,保存当前function回调接口,这样一来,当前对象持有上一级ObservableJust的引用。不管map调用几次,当前对象都会持有上一级产生的对象的引用

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

假设再次调用map之后,这个map就是由上一次调用map产生的ObservableMap对象调用的,此时会将上一级这个ObservableMap对象保存到当前对象的source成员变量中,就这样,一级套一级

subscribeOn方法

产生一个ObservableSubscribeOn对象,并将上一级的ObservableMap对象保存为当前对象的source变量,保存传入的scheduler,那么这个scheduler是什么?

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

subscribeOn(Schedulers.io())方法使上边的操作在子线程中执行,Schedulers.io()就是上边传入的schedulers,我们看一下schedulers是如何创建的
来到Schedulers类中

public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

可以找到IO对象是在本类静态代码块中创建的

 static {
        ....
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
        ....
    }

IOTask是一个实现了Callable接口的线程

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

线程执行会得到Scheduler,可以看到,这是以内部类形式实现的单例模式

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

可以看到,这个IoScheduler内部是线程池实现的

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

也就是说,当我们在代码中设置了这个操作之后(subscribeOn(Schedulers.io())),会创建一个线程池(如果存在就不必创建),很明显,最终将会需要放在子线程中执行的方法在这个线程池中执行,从而达到切换线程的效果,目前看到这里,这只能作为一个猜想,我们继续往下看

observeOn(AndroidSchedulers.mainThread())方法

这个方法执行会保存一个运行于主线程的Scheduler,这个主线程Scheduler如何创建的?
AndroidSchedulers中

private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

可以看到这个Scheduler是通过封装Handler得到的一个运行于主线程的封装类,这里将它保存起来。最后我们看subscribe方法

subscribe方法

public final void subscribe(Observer<? super T> observer) {
        ......
            subscribeActual(observer);
        ......
    }

protected abstract void subscribeActual(Observer<? super T> observer);

由于subscribeActual方法是抽象的,那么要从其子类中找,subscribe方法由上次操作observeOn方法得到的ObservableObserveOn对象调用,所以会执行这个类中的subscribeActual方法,进入ObservableObserveOn中

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

scheduler就是之前保存的AndroidSchedulers.mainThread对象,这里的source表示的就是上一级产生的Observable对象,具体到当前代码,就是ObservableSubscribeOn,调用ObservableSubscribeOn中的subscribe方法,逐层向上传递,直到传递到ObservableJust对象中,再不断的调用map中传入的function回调方法apply,当apply方法调用完成,再执行Observer的onNext onComplete方法,具体流程见上边的流程图,下一篇博客我将会详细分析线程调度的源码。
到这里,这段示例代码的流程已经走了一遍

写了一个简化版的RxJava,实现了just map subscribeOn obserseOn方法,有助于对原理的理解,GitHub地址:https://github.com/renzhenming/MyRxJava

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容