Rxjava2之Fusion图解深入理解篇

最近看源码的时候,经常会看到FusionMode。 这玩意网上介绍比较少,粗看也比较复杂,但因为较多运算符中都用到了它,所以此篇决定选择几个代表运算符对它做一下分析。(图看不清楚点开看大图吧。。orz)

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

Flowable.range(1, 5)

              .flatMapIterable(mapper, 2)

              .subscribe(ts);

我们以上面的test code作为主要例子进行一步步肢解分析。最初看可能会比较晕,但这玩意多看几次就可以串起来了> < 自己的笔记记录,写的可能不是很流畅 见谅。

FlowableRange

image.png

这里的分析与 Rxjava图示入门篇的方法一致。这里先介绍三个重要的类:

BaseRangeSubscription

我们知道,subscription其实可以看成上游和下游中间的连接点。下游拿到subscription可以随时取消订阅,而在这节,我们会发现他还有另一个极其重要的作用---与FusionMode的结合。BaseRangeSubscription继承于BasicQueueSubscription,它向外界提供三个接口:

  • requestFusion(mode)
  • request()
  • poll()
requestFusion

外界调用此接口时,会带着自己希望请求的Mode,这个Mode到底是什么后面再说。subsciption收到这个请求的mode后,根据自己的心情返回新的Mode给请求方。这里BaseRangeSubscription就返回了mode&SYNC

request

外界调用此接口时,代表向subscription请求n个item。BaseRangeSubscription收到该请求时,根据n是否=max,调用fastPathslowPath。这两个方法又是干什么的呢?先暂时忽略。

poll

外界调用此接口时,代表希望从subscription中获取item。可以看到FlowableRange就返回index,假设range是[0,10],那就依次会返回0,1,...

其实一般与FusionMode有关的subscription都会提供这三个接口,后面我们会看到这三个接口分别起到什么重要的作用。

RangeSubscription & RangeConditionSubscription

这两货主要为FlowableRange实现了前面接口中的fastPathslowPath,根据图示可以看到本质都是调用了根据request(n)中请求的数量依次调用subscriber.onNext(index)

FlowableFlattenIterable

image.png

这货与以前介绍过的flatMap类似。传入一个mapper,

FlattenIterableSubscriber

当其onSubscribe被调用时,该subscriber会观察与上游连接的subscription是否为QueueSubscription。是不是似曾相识,这个QueueSubscription在前面介绍FlowableRange时提过。如果与上游连接的subscriptionQueueSubscription,就会调用前面的接口1requestFusion(ANY)。看上游返回什么Mode,这里会返回两类,分别为SYNCASYNC

sync

假设上游返回SYNC,done = true。代表它们之间的交互为同步。

if (m == SYNC) {
    fusionMode = m;
    this.queue = qs;
    done = true;

    actual.onSubscribe(this);

    return;
}

这里看到存在一个变量为queue,下游可以通过queue向上游获取数据

aysnc

假设上游返回ASYNC,代表上下游之间的交互为异步。subscription.request(prefetch)。调用接口2。根据前面的介绍会调用slowPath,即调用FlattenIterableSubscriber.onNext

这里我们看onNext:上代码吧

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (fusionMode == NONE && !queue.offer(t)) {
        onError(new MissingBackpressureException("Queue is full?!"));
        return;
    }
    drain();
}

这里因为是async,所以不是done

  • ASYNC 进入drain()

  • NONE 如果队列已经满了抛出错误

FlattenIterableSubscriber也和queue有关,它也提供之前介绍过的三个接口。

在讲两个运算符结合之前,我们最后讲一下drain()到底干了什么吧!

drain

前面提到过,subscription有一个this.queue队列。drain会依次通过队列向上游请求item,通过iterable = mapper.apply(t);得到iterable,然后再遍历iterable,调用下游subscriber.onNext/onComplte,完成数据流动过程。

两个操作符熔合

终于到了激动人心的时刻。这两个操作符如果合在一起会产生什么神奇的效果呢?数据到底是怎么流动的呢?

再放一次要介绍的例子:

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

Flowable.range(1, 5)

              .flatMapIterable(mapper, 2)

              .subscribe(ts);
image.png

当调用subscribe(ts)时首先调用FlowableFlatMapIterable.subscribe,此时自己看代码发现调用了source.subscibe(FlattenIterableSubscriber),即将上游FlowableRange与中游的FlattenIterableSubscriber结合。中游发现有人要给自己发数据了,就调用自己的onSubscribe,即FlattenIterableSubscriber.onSubscribe(BaseRangeSubscription),此时BaseRangeSubscription作为上游和中游之间的connection

onSubscribe

这时候发生的事情其实前面都介绍过了。但我自己都快忘了,重述一遍,此时中游的FlattenIterableSubscriber会通过connection BaseRangeSubscription请求Mode,这里BaseRangeSubscription返回SYNC,代表上游只支持同步模式。前面只简单介绍了SYNC,这里再重点介绍一下。


if (m == SYNC) {
    fusionMode = m;
    this.queue = qs;
    done = true;

    actual.onSubscribe(this);

    return;
}

这里会继续调用realSubscriber.onSubscribe,我们先忽略下游也会有mode请求的情况,

long mr = missedRequested.getAndSet(0L);
if (mr != 0L) {
    s.request(mr);
}

此时我们的testSubscriber会通过中游与下游之间的connectionFlattenIterableSubscriber向中游请求数据。此时会调用前面介绍过的drain,而调用drain时,中游会主要通过上游与中游之间的connectionBaseRangeSubscription.poll即接口三要求上游同步返回数据给它,数据返回后再依次返回给下游。

耶其实到这里就完成了整个数据的流动。是不是其实也不怎么复杂(...别欺骗自己了)

到这里我们可以说,到底mode是什么了。拿住官方解释吧。

/**
* Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In synchronous fusion, all upstream values are either already available or is generated
* when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
* that is the indication if a terminated stream.
* In this mode, the upstream won't call the onXXX methods and callers of
* {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
* to be called sequentially (from within a serializing drain-loop).
*/

先介绍SYNC,说明当上游告诉下游它支持同步时,下游只会通过poll求取数据,取一个给一个,如果上游返回null,说明整个数据流已经被中断了。所以在这种模式下,上游是不会主动调用下游的onXXX的。

ASYNC又是什么呢?

ASYNC

我们再走一遍async的流程吧!

if (m == ASYNC) {
    fusionMode = m;
    this.queue = qs;

    actual.onSubscribe(this);

    s.request(prefetch);
    return;
}

看到async时,中游已经主动向上游去预先取数据了,所以有prefetch
的概念。因此此时,上游就会去调用中游的onNext。看到这里可以发现asyncsync的部分不同了,sync只会通过poll去主动拉数据,而async是会去主动request数据让上游主动调用自己的onNext方法。

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (fusionMode == NONE && !queue.offer(t)) {
        onError(new MissingBackpressureException("Queue is full?!"));
        return;
    }
    drain();
}

onNext中看到依旧调用了drain,说明中游收到上游的通知时会再通过poll去拉取上游给它准备的数据。

好吧,那现在让我们看看ASYNC的官方解释吧

/**
* Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
* for an accepted mode.
* <p>
* In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
* Upstream signals onError() and onComplete() as usual but onNext may not actually contain
* the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
* that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
* (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
* prepared to catch exceptions.
*/

在异步模式中,上游会通过onNext通知下游去取数据,此时下游就会通过poll去取上游给自己的准备的数据。

可以看到,一个是下游主动同步去取,一个是上游发通知并携带数据给下游。下游其实是不知道数据什么时候会来的。

注释其实还有另一层意思,如果上游给下游发了onNext(null),说明下游此时要通过poll主动去取。这个又是什么意思呢?这里的async例子其实不怎么好,因为FlowableRange只支持sync,所以我们换个例子再看看

FlowabledoAfterNext

sync

TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueSubscription.SYNC);

Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(ts0);

这里sync运用在中游与下游之间。可以看到,当modesync时,

if (initialFusionMode != 0) {
    if (s instanceof QueueSubscription) {
        qs = (QueueSubscription<T>)s;

        int m = qs.requestFusion(initialFusionMode);//sync
        establishedFusionMode = m;

        if (m == QueueSubscription.SYNC) {
            checkSubscriptionOnce = true;
            lastThread = Thread.currentThread();
            try {
                T t;
                while ((t = qs.poll()) != null) {
                    values.add(t);
                }
                completions++;
            } catch (Throwable ex) {
                // Exceptions.throwIfFatal(e); TODO add fatal exceptions?
                errors.add(ex);
            }
            return;
        }
    }
}

while ((t = qs.poll()) != null) {
values.add(t);
}

所以再次印证,同步都是靠下游主动去拉的。

重点关注async:

async

TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC);

Flowable.range(1, 5)
.doAfterNext(afterNext)
.subscribe(ts0);
actual.onSubscribe(s);
long mr = missedRequested.getAndSet(0L);
if (mr != 0L) {
    s.request(mr);
}
image.png

进入DoAfterSubscriber.request(mr)->UnicastProcessor.request(mr)->drain->a.onNext(null)

if (establishedFusionMode == QueueSubscription.ASYNC) {
    try {
        while ((t = qs.poll()) != null) {
            values.add(t);
        }
    } catch (Throwable ex) {
        // Exceptions.throwIfFatal(e); TODO add fatal exceptions?
        errors.add(ex);
        qs.cancel();
    }
    return;
}

可以看到这里就是官方解释的验证。这里中游和下游是异步的,所以当下游发出request告诉中游自己想要n个item时,最终中游通过上游返回onNext(null),下游收到这个通知时才会主动去拉数据。显而易见,中游什么时候给数据下游其实是不知道的,这里onNext也没有像之前我们分析过的操作符一样把item传递给下游,只起了一个通知的作用。

其实这里我们还是忽略了很多东西,为什么上游会返回onNext(null),这里其实我们已经假定上中游也是异步模式了。这是因为FlowableOnNext当收到下游要异步的请求时,会也向上游请求自己也要使用异步的模式。所以这里上游就直接返回了onNext(null),我们进一步研究代码会发现如果中游向上游请求sync模式,那上游会返回onNext(t)

while (r != e) {
      boolean d = done;

      T t = q.poll();
      boolean empty = t == null;

      if (checkTerminated(failFast, d, empty, a, q)) {
          return;
      }

      if (empty) {
          break;
      }

      a.onNext(t);

      e++;
  }

总结

写到这里其实可以理解为什么要提供三个接口了,

  • poll 用于同步请求返回数据,或是异步通知下游来拿数据
  • request 用于当收到下游的请求时的操作,如果是同步直接会返回给下游数据,否则可能只是通知下游我有数据了你自己来拿吧。

操作符背后真的包含很多思想,这节只介绍了asyncsync,boundary还没看懂。。 后面还会继续研究操作符熔合,看懂了再写吧

如果哪里理解的不对,欢迎及时指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容