RxJava2 原理源码分析(一)

Rxjava 框架结构

RxJava 的整体结构是一条链,其中:

  1. 链的上游:生产者 Observable
  2. 链的最下游:观察者 Observer
  3. 链的中间:各个中介节点,既是下游的 Observable,又是上游的 Observer
    我们从一段最简单的 RxJava 代码来展开,即 Single。

首先说明一下:本片博客所看Rxjava2的源码版本为 [rxjava:2.2.9]

Single 分析

Single.just("1")
        .subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: ");
            }

            @Override
            public void onSuccess(String s) {
                Log.e(TAG, "onSuccess: s=" + s);
            }

            @Override
            public void onError(Throwable e) {
                String errMsg = e.getMessage();
                if (TextUtils.isEmpty(errMsg)) {
                    errMsg = e.getClass().getName();
                }
                Log.e(TAG, "onError: e=" + errMsg);
            }
        });

RxJava 这种连着调用的写法是可以拆分的,那我们简单的拆分下,方便看:

Single<String> stringSingle = Single.just("1");  // 创建一个新的上游 Single 对象
SingleObserver<String> observer = new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        mDisposable = d;
        Log.e(TAG, "onSubscribe: ");
    }

    @Override
    public void onSuccess(String s) {
        Log.e(TAG, "onSuccess: s=" + s);
    }

    @Override
    public void onError(Throwable e) {
        ......
        Log.e(TAG, "onError: e=" + errMsg);
    }
};
stringSingle
        .subscribe(observer);

看下 just()方法做了什么事情

    // Single.java#just() 源码
    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "item is null");// 验空,不为空返回自己,为空抛异常
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }


    // 这个方法就是判空处理,如果 object 为空,抛异常程序终止;否则返回 object 自己
    public static <T> T requireNonNull(T object, String message) {
        if (object == null) {
            throw new NullPointerException(message);
        }
        return object;
    }

可以看到 just()就两行代码,一个验空方法和一个钩子方法,下面简单瞄一眼 onAssembly(),默认情况下不用关注它,不影响主流程:

    // RxJavaPlugins.java # onAssembly()
    public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
            // 这是一个钩子方法,默认情况下,就是我们什么也不做的话,默认 onSingleAssembly == null
        // 即 我们传进来的什么,就返回什么
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

那么就剩下一个了,就是 RxJavaPlugins.onAssembly(new SingleJust<T>(item))方法的参数,创建的一个新的对象

// SingleJust.java 
public final class SingleJust<T> extends Single<T> {
    final T value;
    // 这个 T 就是上游生产的事件,这里保存下来了
    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
      // 直接调用 onSubscribe()和 onSuccess() 方法
      // 因为 Single.just(T) 方法时没有延时,没有后续事件的上游被观察者,不会执行失败,一旦开始订阅(subscribe())就会立马执行完毕
      // 注意这里是不会回调 onError()方法的,因为 Single过于简单,根本不会失败
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

SingleJust.java实际上就把上游的值,给保存了下来,然后有一个 subscribeActual()方法,执行了方法参数 observer 的 onSubscribe()和 onSuccess()方法。

嗯,到目前 Single.just("1")这句代码就干了两件事,new 了一个 SingleJust 对象,并且把字符串 "1"给存下来了。

继续往下看订阅代码 stringSingle .subscribe(observer); subscribe 源码

  // Single.java#subscribe()
    // 实际上这里进了 SingleJust 里边去了,just()方法创建的 SingleJust 对象
    @SchedulerSupport(SchedulerSupport.NONE)
  @Override
  public final void subscribe(SingleObserver<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");  // 判空
        observer = RxJavaPlugins.onSubscribe(this, observer); // 下钩子,默认返回自己
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        try {
            subscribeActual(observer);   // 关键代码在这里,调用了实际的 subscribe 方法,把下游的 observer 传了进去
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }


Single.just("1").subscribe() 这里就调用了上一步创建的 SingleJust 对象的 subscribeActual(observer)方法,就把上游生产的事件成功发送到了下游的 observer 了

subscribe()方法代码不多,并且有两个眼熟的判空代码,和一个钩子方法,都不用管,重点就是 subscribeActual(observer);这一句,实际上是执行到了上一步我们创建的SingleJust对象的.subscribeActual(observer)方法里边去了,只是简单的把下游的观察者对象 observer 对象作为参数传了进去。我们再来瞄一下 subscribeActual(observer);

// SingleJust.java#subscribeActual()
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposables.disposed());
    observer.onSuccess(value);
}

纳尼?直接就调用了传进来的 observer 的两个方法,即下游的 onSubscribe()和 onSuccess()方法就正常执行了,这个 Single 太简单了,简单到就不会执行下游的 onError()方法,下游一开始订阅(.subscrive()),事件就从上游直接传到了下游;

再来看下 Disposables.disposed()是怎么 disposed 的,

// Disposables.java#disposed()
@NonNull
public static Disposable disposed() {
    return EmptyDisposable.INSTANCE;
}

// 好继续往下跟踪,是一个枚举类
public enum EmptyDisposable implements QueueDisposable<Object> {
    
    INSTANCE,
    NEVER;

    @Override
    public void dispose() {
        // no-op
    }

    @Override
    public boolean isDisposed() {
        // 始终返回 true,即一个 disposed 的状态
        return this == INSTANCE;
    }
    // ……
}


当你调用 mDisposable.dispose();切断上下游的事件时,什么作用都没有,因为这已经是一个被丢弃了的状态了,只是 Single 这里是这么简单处理的,因为用 Single 来发或生产事件是没有延时,也没有后续事件的,订阅的一瞬间就执行完毕了。我们甚至没看到下游观察者的 onError() 就没被调用。

来对 Single 做个小总结:

  1. 链的上游:生产者 Single
  2. 链的下游:观察者 SingleObserver
  3. Single.just(T) 会创建一个新的上游对象 SingleJust,并把我们的上游事件保存下来
  4. 一旦开始订阅(.subscribe),在 SingleJust 内部就会把事件直接交给 下游的 observer 处理

我们来看一个比较常用的操作符, map

Single.just(1)
        .map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return String.valueOf(integer);
            }
        })
        .subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: ");
            }

            @Override
            public void onSuccess(String s) {
                Log.e(TAG, "onSuccess: s=" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: e=" + e.getMessage());
            }
        });

这个简单小实例中是把 int事件->String事件,看看是事件是怎么转换的;

// Single.java#map()
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}

// 哇 好眼熟,一个判空,一个钩子,不管,我们只看 创建的 SingleMap里边做了什么
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;  // 上游的事件生产者
    final Function<? super T, ? extends R> mapper;  // map 操作符,事件变换函数

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        // 同样的,这里的执行时机是下游的观察者订阅了事件的一瞬间
        // 这个 source 在这个例子中,就是上游新建的 SingleJust对象,把上游和下游连接了起来
        // 这个t 就是下游我们写的观察者,订阅的时候传了进来
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {
                // ......
    }
}
// SingleMap 的内部类
static final class MapSingleObserver<T, R> implements SingleObserver<T> {

    final SingleObserver<? super R> t;  // 下游的 observer
    final Function<? super T, ? extends R> mapper;  // 事件变换函数

    MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
        this.t = t;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Disposable d) {
      // 这里调用时机,还记得么?就是新创建的上游 SingleJust.java#subscribeActual()里调用的
        t.onSubscribe(d);
    }

    @Override
    public void onSuccess(T value) {
        R v;
        try {
            v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            onError(e);
            return;
        }

        t.onSuccess(v);
    }

    @Override
    public void onError(Throwable e) {
        t.onError(e);
    }
}


MapSingleObserver个人理解是装饰者模式的一种体现,它是对下游 observer 的一种装饰,即对被装饰对象.subscribe(new SingleObserver<String>() {})匿名内部类的功能的一种增强。

画张图描述一下 不加 map 操作符时,代码的执行流程:

简单流程

再看下 加了 map操作符之后的流程,稍微复杂了点

加了map操作符后

当加了一个 map 操作符后,会创建一个新的上游对象 SingleMap,然后新对象包裹旧对象(SingleJust),同时会创建一个新的下游观察者 MapSingleObserver,同样包裹真实的本例中下游观察者SingleObserver;一旦订阅发生(.subscribe()),事件的执行流行为:

SingleMap 会把事件往上传给 SingleJust,SingleJust(MapSingleObserver 回调 onSubscribe 和 onSuccess),然后往下传 MapSingleObserver(回调给真实的 observer.onSubscribe() 和 变换后回调给 observer.onSuccess ) 到此结束。

最后的总结开篇已经总结过了,至此我们已经明白了几件事:

  1. 链的上游是如何发送数据的
  2. 链的下游是如何接收数据的
  3. 如何将上下游进行关联起来的
  4. 简单的操作符 map是如何工作的

事实上只有当下游发生订阅事件(调用.subscribe()) 时,上游才会发事件;同时会调用onSubscribe(Disposable)方法,如果链的中间有节点,那么会创建一个新的上游对象,然后包裹住老的上游对象,每个中间节点作用既是它上游的 observer,又是它下游的 Observable。

下一篇会介绍下,线程切换和事件中断丢弃相关源码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容