RxJava背压原理

作为RxJava的典型特性之一,背压,我们知道他是在生产者与消费者速率不一致时做的一种数据处理策略,只是在想,它是如何实现的?内部原理是什么?

还是从源码入手,撸完源码,想必能解开自己的疑惑。

背压分析

1 来一段背压控制的代码。

        Flowable.range(1,100_000)
                .onBackpressureDrop()
                .subscribe(s-> {
                    Thread.sleep(100);
                    System.out.println(s);
                });

2 根据前面的分析,知道Flowable.range会产生一个新的Observable,但是对于Flowable则是产生了FlowableRange。

new FlowableRange(start, count)

3 FlowableRange扩展了Flowable,调用其onBackpressureDrop()方法,内部实现为。

// this即上一步的FlowableRange。
// 内部的source = FlowableRange
new FlowableOnBackpressureDrop<T>(this)

4 subscribe方法,类似于Observable,只是这里为LambdaSubscriber。然后subscribe内部调用的是subscribeActual方法,参数为LambdaSubscriber

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
                // onNext为我们传入的Consumer类
        LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }


5 查看FlowableOnBackpressureDrop的subscribeActual方法实现。其实不难理解这里调用到的是第一步的FlowableRange的subscribeActual方法,参数为new BackpressureDropSubscribe。

// FlowableOnBackpressureDrop
// s = LambdaSubscriber
// onDrop = this = FlowableOnBackpressureDrop实例
this.source.subscribe(new BackpressureDropSubscriber<T>(s, onDrop))

6 看下FlowableRange的subscribeActual方法。

    @Override
    public void subscribeActual(Subscriber<? super Integer> s) {
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new RangeConditionalSubscription(
                    (ConditionalSubscriber<? super Integer>)s, start, end));
        } else {
                // 走这段逻辑,上面代码忽略
                // s = BackpressureDropSubscriber
                // start,end为开始时传入的参数,当前的案例start = 1, end = 100_000
            s.onSubscribe(new RangeSubscription(s, start, end));
        }
    }

7 调用BackpressureDropSubscriber的onSubscribe方法。参数为new RangeSubscription

// BackpressureDropSubscriber
            @Override
        public void onSubscribe(Subscription s) {
                        // upstream为RangeSubscription
                        // downstream为LambdaSubscriber
            if (SubscriptionHelper.validate(this.upstream, s)) {
                this.upstream = s;
                downstream.onSubscribe(this);
                s.request(Long.MAX_VALUE);
            }
        }

8 查看LambdaSubscriber的onSubscribe方法。

// LambdaSubscriber
        @Override
    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.setOnce(this, s)) {
            try {
                    // 因为这里我们没有实现onSubscribe方法,这里什么都没处理。
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.cancel();
                onError(ex);
            }
        }
    }
image-20191217212345843

9 在BackpressureDropSubscriber的upstream中request方法,其根据n值走那条路径,在我们的代码中走的是fastPath。

 @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                if (BackpressureHelper.add(this, n) == 0L) {
                        // 走fastPath
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    } else {
                        slowPath(n);
                    }
                }
            }
        }

10 看一下fastPath内容,实际上就是循环的将range中的代码丢给下游去处理。

        @Override
        void fastPath() {
                // f = 100_0000
            int f = end;
            Subscriber<? super Integer> a = downstream;
                        // index = 1,不断的调用下游的onNext方法                   
            for (int i = index; i != f; i++) {
                if (cancelled) {
                    return;
                }
                a.onNext(i);
            }
            if (cancelled) {
                return;
            }
            // 最终调用下游的onComplete方法。
            a.onComplete();
        }
    

11 BackpressureDropSubscriber在接收到上游的这些数据后如何处理的呢?

              @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            // 在每一次调用downstream.onSubscribe方法的时候都会请求上游一个数字,默认是Long.Max
            long r = get();
            if (r != 0L) {
                    // 调用下游的观察者去处理
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                try {
                    onDrop.accept(t);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    cancel();
                    onError(e);
                }
            }
        }

12 背压的工具类处理。

BackpressureHelper.produced(this, 1);
public static long produced(AtomicLong requested, long n) {
        for (;;) {
                // 如果设置为默认的Long.Max_value的时候不会进行背压控制
            long current = requested.get();
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            // 当前值每次消耗1个,直到小于0,那么启用背压策略。
            long update = current - n;
            if (update < 0L) {
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            if (requested.compareAndSet(current, update)) {
                return update;
            }
        }
    }

13 可以看出来在创建FlowableSubscriber时如果指定了要请求的数据量,那么上游返回的数据就可以得到控制。

修改最开始的代码如下:

Flowable.range(1, 100_000)
                .onBackpressureDrop()
                .subscribe(new FlowableSubscriber() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(100);
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("on error");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("complete");
                    }

                });

        Thread.sleep(10000);
// 最终只会打印到100

14 因为这里是同步处理的逻辑,上游生产的数据会同步的被下游处理,而在同步状态下,下游只好指定自己准备消耗多少数据。

其实背压就是在进行Subscription的时候指定自己要处理多少数据,如果这些数据已经全部下发到下游了,后续继续下发的数据就会被背压策略处理。

简述

上面代码分析比较繁琐,看完之后,我觉得背压处理逻辑是这样:

Subscriber: 开始subscribe了,我想要100个数据。

Upstream: 不关心下游的状态,不断发送数据,每次发送的时候,Subscriber设置的请求100个数据都会减少一个,直到最终为0. 当request的数量减少到0的时候,代表发给下游的数据已经超出下游想要处理的数据。

Subscriber:我这边处理完了100个数, 还想要一些数据,再去request一些数据。

Upstream:看到request的数据又不为0了,继续发数据给下游。

Subscriber:我不想要数据了,调用cancel()

upstream: 已经cancel了,那么数据就不再下发了。

upstream:我发现我的数据已经全部给下游了,调用下游的onComplete处理。

最后

背压处理,其实就是让消费者通知生产者自己想要处理多少数据,或者能够处理多少数据,然后上游的数据源收到了消费者请求的内容,做自己的策略处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,198评论 6 514
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,334评论 3 398
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,643评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,495评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,502评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,156评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,743评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,659评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,200评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,282评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,424评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,107评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,789评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,264评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,390评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,798评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,435评论 2 359

推荐阅读更多精彩内容