Rxjava 过程分析一之简易流程

Rxjava 过程分析一

说明

  • 该文章是基于 Rxjava2 源码。
  • 该篇只是讲述 Rxjava 建议的用法,不涉及操作符和线程切换, 后两个会有新的篇幅去写。 一步一步的来。
  • 在源码中那些判空还有 Rxjava 中 RxJavaPlugins 钩子等在分析中去除(只关注用法和思想, 和主流程不管的暂时剔除)。
  • 由于习惯, 和 Rxjava2 中的命名。 我称 emitter 为上游, 也就是发射水(数据)的源头, 结果回调给外部的 FlowableSubscriber, 我称它为下游。 上游流水流到下游!

最简单的使用

Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
         // emitter.onNext("");
        // emitter.onError();
       // emitter.onComplete();
    }
}, BackpressureStrategy.LATEST).subscribe(new FlowableSubscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
    }
    @Override
    public void onNext(String s) {
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }
});

引发的思考

  1. 调用 emitter 的 onNext、 onError、 onComplete, 就会回调 FlowableSubscriber 中对应的方法。 那么这两个对象是一个吗? 有什么联系呢?
  2. 我们把上述代码写好后, 会自动调用并回调, 那么上游发射器 emitter 是什么时候触发的呢? 该方法是什么时机和谁调用的呢?

源码分析

从创建开始

 public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    return new FlowableCreate<T>(source, mode);
}

简单吗大兄弟, 仅仅是创建了一个 FlowableCreate 类而已。 并对成员变量赋值。

简单说说下游

在开发中, 可能很多都有在用回调吧。 再次机会我也想说说回调是咋回事。 其实 java 中的常用的内部类回调, 还是 c 的函数指针, 或者其语言的闭包(swift), 其实不要把它们想的多么神奇。 就这么想, 我把一个实例地址或者函数地址给你了, 你在内部去调用我的方法, 自然就运行到了外面了。

订阅

 public final void subscribe(FlowableSubscriber<? super T> s) {
    try {
        Subscriber<? super T> z = s;
        subscribeActual(z);
    } catch (NullPointerException e) { 
        throw e;
    } catch (Throwable e) {

    }
}

信息量很少, 只是调用了当前 Flowable 的 subscribeActual() 方法。 我们前面知道当前的 Flowable 是 FlowableCreate 对象, 所以进 FlowableCreate 中去看看做了什么事情。

 public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;

    switch (backpressure) {
    case MISSING: {
        emitter = new MissingEmitter<T>(t);
        break;
    }
    case ERROR: {
        emitter = new ErrorAsyncEmitter<T>(t);
        break;
    }
    case DROP: {
        emitter = new DropAsyncEmitter<T>(t);
        break;
    }
    case LATEST: {
        emitter = new LatestAsyncEmitter<T>(t);
        break;
    }
    default: {
        emitter = new BufferAsyncEmitter<T>(t, bufferSize());
        break;
    }
    }

    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        emitter.onError(ex);
    }
}

我们这里不去讨论背压等问题, 所以我们只是关注主要流程和关键方法。 其中一眼就可以看到关键的一个就是在 try 块中的 source.subscribe(emitter); source 是什么呀? 就是我们在 new FlowableCreate 传进来的 Flowable.create(new FlowableOnSubscribe<String>()) 中 FlowableOnSubscribe 对象。 source 的 subscribe 这不就是运行了 外部 FlowableOnSubscribe 的 subscribe嘛, 所以外部调用 onNext, onError, onComplete 方法, 其实调用了 内部 emitter 中对应的方法。 我们以背压为 LATEST 为例看看 LatestAsyncEmitter 被调用的方法做了什么事情。 先多说一句, 初始化 emitter 时我们传入的是下游哦, 下游相应的方法调用了, 那么外部的就会看似回调出去拿到结果了!

我们以 onNext 为例, 看看 LatestAsyncEmitter 被调用到 onNext 做了什么事情。

 public void onNext(T t) {
    queue.set(t);
    drain();
}

看到是先把结果存到了队列中, 我们不考虑背压, 所以我们看主要的大致流程哈。 显然下一个有用的代码就是 drain() 了。

 void drain() {
    final Subscriber<? super T> a = downstream;
    final AtomicReference<T> q = queue;
        // ......
        T o = q.getAndSet(null);
        // ......
        a.onNext(o);
        // ......
}

其中 downstream 就是我们外部的 FlowableSubscriber 及下游了。 我们可以看到, 简单的从队列中取出数据, 直接调用了下游的 onNext。 就这样数据就被从上游流向了下游。

前面的疑惑问题

  • 上游和下游是一个东西吗? 它们的关系是什么?

这个问题从上面的分析已经很明显了。 上游和下游不是一个东西, 上游 emitter 调用相应的方法去回调下游的方法。

  • 在哪一个时刻触发的事件流动呢?

其实是在上游 emitter 调用相应的方法那一刻, 比如调用 onNext。 那么是在哪一个时机触发调用的呢? 很明显是在订阅时, 调用了 subscribeActual 中又调用了上游的 subscribe(emitter) 触发了数据的流动。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容