从源码分析RxJava订阅过程

wdroid.jpg

都知道观察模式吧?

在开始之前让我们简单了解一下观察模式,就是某对象A的变化引起其他多个对象B变化,但是前提是你需要去订阅我,打个比方:就是我的状态发生了改变,那我怎么通知你呢?所以我需要知道的如何去通知其他对象说我这里已经改变了,你看看那需不需要做出改变。就比如微信的订阅号,如果你不订阅,那该订阅号在发布内容也不会通知,这里的订阅号就是被观察者,而用户就是观察者。那怎么说让这两者关联来呢?前面说的订阅号是要提供一个接口,允许用户去订阅的,所以最后就是被观察者和观察者两个都得提供接口,订阅号提供的接口让用户去订阅类比微信号,当订阅号发布内容,就通过这个微信号通知观察者,所以订阅就是这两者的关联点。

开始之前的两个重要的类或接口:ObservableObserver

  • Observable 它实现ObservableSource接口,通俗来讲Observable就是一个被观察者也有人叫可观察的资源,这里就叫被观察者;
  • Observer 观察者;
    涉及的类:


    RxJava2.png

订阅流程分析

开始RxJava的订阅流程分析之前,来个简单的栗子,代码如下:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

日志结果:

onSubscribe ,Thread: main
onNext ,Thread: RxNewThreadScheduler-3
onComplete ,Thread: RxNewThreadScheduler-3

如上代码,之所以分开来写是为了更清晰的去理解每一步RxJava生成的相关类。

如果你认真看前面的内容,你一下就明白Observable.subscribe()方法也就是订阅的意思,是 ObservableObserver 的关联点,也就是被观察者和观察者的关联点,所以我们的分析就从Observable.subscribe(Observer observer)方法开始代码如下:

 public final void subscribe(Observer<? super T> observer) {
    try {

        // .....此处省略几亿代码....

        //此方法在Observable类是中是抽象的,注定是子类实现
        subscribeActual(observer);

        // .....此处省略几亿代码....
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
       // .....此处省略几亿代码....
    }
}
  • 上面代码不难理解在subscribe方法中直接就调用了subscribeActual(observer)方法,我可以翻译为 实际订阅;
  • subscribe方法是Observable类的方法,他是抽象类,传入了一个 Observer 对象,开始的时候栗子我们可以知道Observable是通过我们调用Observable.create(ObservableOnSubscribe) 所创建出来的;
  • 那subscribeActual在Observable中是抽象方法,肯定是子类去实现了该方法,从第二点知道子类肯定是在Observable.create(ObservableOnSubscribe)中给new出来的,那么接下我们看看Observable.create(ObservableOnSubscribe)方法的实现;

// Observable.create(ObservableOnSubscribe)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // .....此处省略几亿代码....

    // 直接就创建了ObservableCreate,并把source作为参数传进去
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

// onAssembly

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们从上面代码我们知道在Observable.create(ObservableOnSubscribe)中直接就创建了ObservableCreate,而ObservableCreate是Observable的子类,并把source作为参数传进去,最后调用RxJavaPlugins.onAssembly方法,我们默认返回ObservableCreate实例,所以Observable.create方法最后返回的是ObservableCreate实例,所以就验证了上面的第三点实际调用的是ObservableCreate.subscribeActual(observer)方法,这是在不考虑其他变换和线程切换的情况,那我们就来看看ObservableCreate.subscribeActual(observer)方法的实现,代码如下:

 @Override
protected void subscribeActual(Observer<? super T> observer) {
    //事件发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //直接回调Observer的onSubscribe方法,这个方法是和线程切换无关,只在当前的线程中执行
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

代码不多,也很好理解:

  • 首先调用observer.onSubscribe(parent)方法通知Observer已经订阅成功了。
  • 最后调用source.subscribe(parent)方法完成订阅,source又是什么呢?我们知道在ObservableCreate是在Observable.create方法时创建的,并把ObservableOnSubscribe传进来,所以source就是ObservableOnSubscribe,直接回调ObservableOnSubscribe.subscribe方法并把CreateEmitter作为参数传递进去,之后再我们是栗子中通过这个对象调研onNext方法或者onComplete方法发射事件;

看一下CreateEmitter的实现,代码如下:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

    //通过构造方法注入 观察者实例
    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

   // .....此处省略几亿代码....

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

   // .....此处省略几亿代码....
}

为了简介清晰我删掉很多无关代码,只保留onNext等这些相关的方法。

  • 其实CreateEmitter是Observable的静态内部类,
  • 在上面我们知道Observable.subscribeActual方法中创建了CreateEmitter实例并将Observer作为参数通过构造方法注入Observer实例,作为CreateEmitter的成员变量;
  • 之后在subscribeActual方法中调用ObservableOnSubscribe.subscribe的方法并把CreateEmitter实例作为方法参数传递进去;
  • 简单来说CreateEmitter的作用就是发射事件,里面分装了Observer实例,发射事件就回调到Obsever中的方法,如onNext等方法;

有没有发现从一开始我们就仅仅讲了从Obsevable的创建到订阅,这是比较汉理解的,如果我增加一个map或线程切换呢?这里暂时不展开讲线程切换。

重新把栗子的代码在贴一遍:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
        @Override
        public Object apply(String s) throws Exception {
            Log.e("tag", "map");
            return "aa";
        }
    })

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

如上代码,订阅流程会和之前的有什么不一样呢?那么我们看个究竟,就从 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())开始,代码如下:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //这里把上游this传进去也就是source,以便调用上游的subscribe方法
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

从上面代码看,我们知道在map方法中创建了ObservableMap并把上游的Observable参进去了,而我们知道从Observable.subscribe方法开始订阅就会调用 subscribeActual(observer)方法,所以在Observable.subscribe之后就会调用ObservableMap的subscribbeActual方法,代码如下:

 public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

在ObservableMap的subscribbeActual方法中,直接调用传进来的Observable的subscribe方法又间接调用subscribbeActual方法没所以,订阅的过程实际上是一样的。

总结

  • Observable是由上游往下游传递的,并且每个操作符都会创建新的Observable对象包裹上游的实例;
  • Observer是由下游往上游传递的,也就是从Observable.subscribe方法开始。

流程图:


流程图.png

时序图:
rxjava.png

上图包括订阅流程、线程切换 以及 事件发布流传的过程,非常详细。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352