RxJava2源码初探-整体设计

RxJava2源码初探-整体设计

首先简单介绍Rxjava2 的四个基本的概念

Observable (可观察者,即被观察者)
Observer (观察者)
subscribe (订阅) 通过该方法,将 Observable 与 Observer 关联起来
事件 (包括 onNext,onComplete,onError 等事件)

Observable


/**
 * The Observable class is the non-backpressured, optionally multi-valued base reactive class that
 * offers factory methods, intermediate operators and the ability to consume synchronous
 * and/or asynchronous reactive dataflows.
 * <p>
 * Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
 * for such non-backpressured flows, which {@code Observable} itself implements as well.
 * <p>
 * The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
 * that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
 * overloads that allow setting their internal buffer size explicitly.
 * <p>
 * The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
 * <p>
 * <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
 * <p>
 * The design of this class was derived from the
 * <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive-Streams design and specification</a>
 * by removing any backpressure-related infrastructure and implementation detail, replacing the
 * {@code org.reactivestreams.Subscription} with {@link Disposable} as the primary means to dispose of
 * a flow.
 * <p>
 * The {@code Observable} follows the protocol
 * <pre><code>
 *      onSubscribe onNext* (onError | onComplete)?
 * </code></pre>
 * where
 * the stream can be disposed through the {@code Disposable} instance provided to consumers through
 * {@code Observer.onSubscribe}.
 * <p>
 * Unlike the {@code Observable} of version 1.x, {@link #subscribe(Observer)} does not allow external disposal
 * of a subscription and the {@code Observer} instance is expected to expose such capability.
 * <p>Example:
 * <pre><code>
 * Disposable d = Observable.just("Hello world!")
 *     .delay(1, TimeUnit.SECONDS)
 *     .subscribeWith(new DisposableObserver&lt;String&gt;() {
 *         &#64;Override public void onStart() {
 *             System.out.println("Start!");
 *         }
 *         &#64;Override public void onNext(String t) {
 *             System.out.println(t);
 *         }
 *         &#64;Override public void onError(Throwable t) {
 *             t.printStackTrace();
 *         }
 *         &#64;Override public void onComplete() {
 *             System.out.println("Done!");
 *         }
 *     });
 * 
 * Thread.sleep(500);
 * // the sequence can now be disposed via dispose()
 * d.dispose();
 * </code></pre>
 * 
 * @param <T>
 *            the type of the items emitted by the Observable
 * @see Flowable
 * @see io.reactivex.observers.DisposableObserver
 */
public abstract class Observable<T> implements ObservableSource<T> {
...
}

Observer

Observer 其实也是一个接口,里面定义了若干方法,onSubscribe ,onNext,onError,onComplete 方法。

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

一个正常的事件序列的调用顺序会是这样的 onSubscribe > onNext > onComplete,若中途出错了,那调用顺序可能是这样的 onSubscribe > onNext > onError
onSubscribe 方法,当我们调用 Observable 的 subscribe 方法的时候,会先回调 Observer 的 onSubscribe 方法,此方法的调用顺序先于 onNext,onError ,onComplete 方法。
onError 方法与 onComplete 方法可以说是互斥的,调用了其中一个方法就不会调用另外一个方法


基本使用

Observable
           .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("z");
                    emitter.onNext("h");
                    emitter.onNext("u");
                    emitter.onComplete();
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe():  ");
                }
                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext():  " + s);
                }
                @Override
                public void onError(Throwable e) {

                }
                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete():  ");
                }
            });

先来看 Observable 的 create 方法

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

在 create 方法中,其实很简单,只是对 source 进行判空处理,并将 source 用 ObservableCreate 包装起来,并返回回去。下面让我们一起来看一下 ObservableCreate 是什么东西?

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

ObservableCreate 其实也很简单,它是 Observable 的子类,持有了上游 source 的引用,并重写 subscribeActual 方法。

接下来我们来看重点了,即 Observable 的 subscribe 方法,在该方法中,他会将 Observalble 与 observer 关联起来。

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    // 检查 observer 是否为 null,为 null 抛出异常
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
       // RxJavaPlugins 插件的,暂时不管
        observer = RxJavaPlugins.onSubscribe(this, observer);


      // 检查 observer 是否为 null,为 null 抛出异常
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribe 方法也比较简单,大概可以分为以下两步:

首先检查 observer 是否为空,为 null 抛出异常
第二步,调用 subscribeActual 方法,而我们知道在 Observable 类中 subscribeActual 是抽象方法,因此,我们只需要关注其实现类的 subscribeActual 方法。从上面的分析,我们知道,当我们调用 Observable create(ObservableOnSubscribe source) 方法的时候,最终会返回 ObservableCreate 实例。因此,我们只需要关注 ObservableCreate 的 subscribeActual 方法

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    ----
}

ObservableCreate 的核心代码主要也只有几行,source 是上游 ObservableOnSubscribe 的引用,而 CreateEmitter 这个类,它是 ObservableCreate 的一个静态内部类,实现了 ObservableEmitter,Disposable 接口 它持有 observer 的引用,当我们调用 CreateEmitter 的 next 方法的时候,它会判断当前的 CreateEmitter 有没有被 dispose 掉,如果没有,调用他持有的 observer 的 onNext 方法, 同理 onComplete 方法一一样,只不过执行完 onComplete 方法的时候,还会执行 dispose 方法,dispose 当前的 CreateEmitter。(dispose 方法这里先记住以下,下面会讲到)

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }

    @Override
    public void setCancellable(Cancellable c) {
        setDisposable(new CancellableDisposable(c));
    }

    @Override
    public ObservableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

好,看完上面的代码,我们回到 ObservableCreate 的 subscribeActual 方法,我们调用 observer.onSubscribe 方法的时候,会将 parent 对象作为方法参数暴露出去(而这个 parent 正是我们的 CreateEmitter,通过 CreateEmitter 的 dispose 方法可以取消订阅关系)。接着,当我们调用 source.subscribe(parent) 的时候,会调用 ObservableOnSubscribe 的 subscribe 方法。

   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }


因此,在我们上面的例子中,若不出错,调用顺序

Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(emitter 是 CreateEmitter 的实例,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onComplete 方法,而不会调用 onError 方法)

若在调用 onNext 方法的过程中出错,那调用顺序可能是这样的

Observable subcrible > Observable subscribeActual > ObservableCreate subscribeActual > observer.onSubscribe > ObservableOnSubscribe subscribe(@NonNull ObservableEmitter emitter)
(emitter 是 CreateEmitter 的实例,包装了 observer,调用 emitter 的相应方法 ,会进而调用 observer 的 onNext onError 方法,而不会调用 onComplete 方法 )


observable 与 Observer 是如何取消订阅关系的

在上面讲解的时候,其实我们已经有提到 CreateEmitter 的 dispose 方法,该方法就是用来取消订阅关系的。

假设这样一个场景,当我们收到的 value 的值大于等于 2 的时候,这个时候认为是异常的,解决两者之间的订阅关系

 Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });

    Observer<Integer> observer = new Observer<Integer>() {
            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d("zhxh", value.toString());
                if (value >=2) {   // >=2  时为异常数据,解除订阅
                    disposable.dispose();
                }
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };

    observable.subscribe(observer); //建立订阅关系

总结

Rxjava 的原理其实不难,Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

用一张简单的流程图描述如下:

RxJava2整体设计

参考

https://www.jianshu.com/p/1651c61254db

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351