RxJava源码分析(1)

RxJava源码分析(1)

Rxjava相信大家都不陌生,是现在很流行的一种解决异步通信的框架,分析源码,不会对RxJava2的源码逐字逐句的阅读,只寻找关键处,我们平时接触得到的那些代码进行分析。
分析的源码版本为:2.0.1

我们的目的:

  1. 知道源头(Observable)是如何将数据发送出去的。
  2. 知道终点(Observer)是如何接收到数据的。
  3. 何时将源头和终点关联起来的
  4. 知道线程调度是怎么实现的
  5. 知道操作符是怎么实现的

本文先达到目的1 ,2 ,3。
我个人认为主要还是适配器模式的体现,我们接触的就只有Observable和Observer,其实内部有大量的中间对象在适配:将它们两联系起来,加入一些额外功能,例如考虑dispose和hook等。

首先看最基础的应用

  Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d(TAG,"currentThread="+Thread.currentThread().getName());
            e.onNext("11");
            e.onComplete();
            e.onError(new Throwable("cuowu"));
        }
    });
    observable.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG,"Disposable="+d.isDisposed());
        }
        @Override
        public void onNext(String value) {
            Log.d(TAG,"currentThread="+Thread.currentThread().getName());
            Log.e(TAG,"onNext:"+value);
        }
        @Override
        public void onError(Throwable e) {
            Log.e(TAG,"onError:"+e.getMessage());
        }
        @Override
        public void onComplete() {
            Log.e(TAG,"onComplete");
        }
    });

这是我们最普通的使用方式。

再根据使用方法分析源码

从create方法开始分析,首先看里面的参数ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

可以看到它里面就一个方法subscribe,这个方法正是我们重写的那个方法。再看它里面的参数ObservableEmitter

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(Disposable d);   
    void setCancellable(Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
}

这几个方法和dispose有关,我们可以暂时不予考虑,接着看看它的父类Emitter

 public interface Emitter<T> {
    void onNext(T value);
    void onError(Throwable error);
    void onComplete();
}

终于看到我们熟悉的方法了,这就是我们在示例中调用到的几个方法。参数分析完毕,就该来看看具体的流程了

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //判null方法我们可以忽略
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins.onAssembly()是一个hook方法,暂时先不去考虑它,也就是说我们需要关注的就是new ObservableCreate<T>(source)这个方法,ObservableCreate是一个继承自Observable的类,它的构造方法如下,可以看到,将传入的参数ObservableOnSubscribe保存起来。

final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
    //在这里就是将ObservableOnSubscribe保存起来
    this.source = source;
}

至此,整个创建流程结束。再分析订阅的流程:

 public final void subscribe(Observer<? super T> observer) {
    //判null操作
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //hook方法
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //判null操作
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //真正执行订阅的方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

从名字可以看出来,真正订阅的地方,但是这里我们需要判断Observable是哪一个了,还记得我们创建返回的是哪个Observable么?就是这个new ObservableCreate<T>(source),下面就该看它的subscribeActual方法了。

protected void subscribeActual(Observer<? super T> observer) {
    //在这里创建的CreateEmitter实现了Disposable接口,暂时可以看成是一个Disposable,且通过传入的observer将
    //observer保存起来
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //首先调用onSubscribe方法,传入Disposable
    observer.onSubscribe(parent);
    try {
        //ObservableOnSubscribe这个源头和下游真正产生联系的方法,这时候源头才开始发送数据
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

source.subscribe(parent)我们知道我们重写了一下3个方法:

         e.onNext("11");
         e.onComplete();
         e.onError(new Throwable("cuowu"));

这里的e就是parent即CreateEmitter,分别看它的onXXX()方法:

   public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        //在这里判断是否Disposed,当调用了onError和onComplete会设为false,所以执行完这两个方法中任何一个后,源
        //头将与下游断开连接,所以onError和onComplete是互斥的。
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
        //通过这个可以知道,当先调用了onComplete方法再调用onError方法就会报错。
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

到这里一个最简单的流程结束。

总结

  • 在subscribeActual()方法中,源头和终点关联起来。
  • source.subscribe(parent);这句代码执行时,才开始从发送ObservableOnSubscribe中利用ObservableEmitter发送数据给Observer。即数据是从源头push给终点的。
  • CreateEmitter 中,只有Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
  • Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()。根据上一点,验证此结论。
  • 先error后complete,complete不显示。 反之会crash
    在这里结束了一个最基本使用流程的源码分析,接下来,我们将分析一下常用的操作符以及线程转换。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容