聊聊reactive streams的backpressure

本文主要研究下reactive streams的backpressure

reactive streams跟传统streams的区别

    @Test
    public void testShowReactiveStreams() throws InterruptedException {
        Flux.interval(Duration.ofMillis(1000))
                .take(500)
                .subscribe(e -> LOGGER.info("get {}",e));

        Thread.sleep(5*60*1000);
    }

输出实例如下:

18:52:34.118 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
18:52:35.157 [parallel-2] INFO com.example.demo.FluxTest - get 0
18:52:36.156 [parallel-2] INFO com.example.demo.FluxTest - get 1
18:52:37.156 [parallel-2] INFO com.example.demo.FluxTest - get 2
18:52:38.159 [parallel-2] INFO com.example.demo.FluxTest - get 3
18:52:39.157 [parallel-2] INFO com.example.demo.FluxTest - get 4
18:52:40.155 [parallel-2] INFO com.example.demo.FluxTest - get 5
18:52:41.154 [parallel-2] INFO com.example.demo.FluxTest - get 6
18:52:42.158 [parallel-2] INFO com.example.demo.FluxTest - get 7
18:52:43.157 [parallel-2] INFO com.example.demo.FluxTest - get 8
18:52:44.156 [parallel-2] INFO com.example.demo.FluxTest - get 9
18:52:45.154 [parallel-2] INFO com.example.demo.FluxTest - get 10

传统的list streams不是异步的,好比如一批500件的半成品,得在A环节都处理完,才能下一个环节B,而reactive streams之所以成为reactive,就好比如这批500件的半成品,A环节每处理完一件就可以立即推往下个环节B处理,源源不断,而不是等所有的半成品都在A环节处理再推往B环节。典型的活生生的一个生产流水线的例子。

backpressure

这样一个生产流水线,有个要求就是每个环节的处理要能够协调,就像电影起跑线里头男主角去工厂打工,流水线花花往他那边推送货物,他速度跟不上,导致货物都掉地上了,最后不得不人工关掉流水线。
在应用程序里头,如果发布者速度过快,而订阅者速度慢,那么就会数据就会堆积,控制不好就容易产生内存溢出,而backpressure就专门用来解决这个问题的。

pull模型的backpressure

@Test
    public void testPullBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

push模型的backpressure

借助线程相关的操作符,比如timeout(),delayElements(),buffer(),skip(),take()来控制数据产生速度。

delayElements

@Test
    public void testPushBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .delayElements(Duration.ofMillis(200))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

输出实例

19:37:00.870 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:37:01.117 [parallel-1] INFO com.example.demo.FluxTest - subscribe:1
19:37:03.326 [parallel-2] INFO com.example.demo.FluxTest - subscribe:2
19:37:05.535 [parallel-3] INFO com.example.demo.FluxTest - subscribe:3
19:37:07.743 [parallel-4] INFO com.example.demo.FluxTest - subscribe:4
19:37:09.953 [parallel-5] INFO com.example.demo.FluxTest - subscribe:5
19:37:12.156 [parallel-6] INFO com.example.demo.FluxTest - subscribe:6
19:37:14.363 [parallel-7] INFO com.example.demo.FluxTest - subscribe:7
19:37:16.568 [parallel-8] INFO com.example.demo.FluxTest - subscribe:8
19:37:18.775 [parallel-1] INFO com.example.demo.FluxTest - subscribe:9

这是个delayElements的例子,可以看到数据不丢失,但是延时是生产延时+消费延时

sample

@Test
    public void testSampleBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .sample(Duration.ofMillis(1000))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

输出实例

19:48:40.516 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:48:40.544 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:48:40.546 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:48:40.770 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
19:48:40.974 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3)
19:48:41.175 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4)
19:48:41.378 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5)
19:48:41.543 [parallel-1] INFO com.example.demo.FluxTest - subscribe:4
19:48:41.583 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6)
19:48:41.785 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7)
19:48:41.989 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(8)
19:48:43.547 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(9)
19:48:43.548 [parallel-1] INFO com.example.demo.FluxTest - subscribe:8
19:48:43.751 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10)
19:48:43.952 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11)

可以看到,由于订阅者速度慢,导致部分数据被丢弃

buffer

@Test
    public void testBufferBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
//                .log()
                .delayElements(Duration.ofMillis(200))
                .buffer(Duration.ofMillis(800))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

输出实例

19:55:06.680 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
19:55:06.712 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
19:55:06.714 [main] INFO reactor.Flux.Range.1 - | onNext(1)
19:55:06.940 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
19:55:07.141 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3)
19:55:07.343 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4)
19:55:07.509 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[1, 2, 3]
19:55:07.545 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5)
19:55:07.748 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6)
19:55:07.951 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7)
19:55:08.156 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(8)
19:55:09.512 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[4, 5, 6, 7]
19:55:11.515 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(9)
19:55:11.516 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[8]
19:55:11.719 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(10)
19:55:11.923 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(11)
19:55:12.127 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(12)
19:55:12.330 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(13)
19:55:12.533 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(14)
19:55:12.735 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(15)
19:55:12.941 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(16)
19:55:13.516 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[9, 10, 11, 12, 13, 14, 15]
19:55:15.517 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(17)
19:55:15.517 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[16]
19:55:15.721 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(18)
19:55:15.925 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(19)
19:55:16.127 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(20)
19:55:16.331 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(21)
19:55:16.537 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(22)
19:55:16.738 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(23)
19:55:16.942 [parallel-8] INFO reactor.Flux.Range.1 - | onNext(24)
19:55:17.519 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[17, 18, 19, 20, 21, 22, 23]
19:55:19.522 [parallel-1] INFO reactor.Flux.Range.1 - | onNext(25)
19:55:19.522 [parallel-1] INFO com.example.demo.FluxTest - subscribe:[24]

将每个800ms内产生的数据堆积为一批次推送给订阅者

skip

@Test
    public void testSkip() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .skip(Duration.ofMillis(800))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

输出实例

20:02:07.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:02:07.606 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:02:07.608 [main] INFO reactor.Flux.Range.1 - | onNext(1)
20:02:07.815 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
20:02:08.016 [parallel-3] INFO reactor.Flux.Range.1 - | onNext(3)
20:02:08.218 [parallel-4] INFO reactor.Flux.Range.1 - | onNext(4)
20:02:08.421 [parallel-5] INFO com.example.demo.FluxTest - subscribe:4
20:02:10.425 [parallel-5] INFO reactor.Flux.Range.1 - | onNext(5)
20:02:10.631 [parallel-6] INFO com.example.demo.FluxTest - subscribe:5
20:02:12.635 [parallel-6] INFO reactor.Flux.Range.1 - | onNext(6)
20:02:12.840 [parallel-7] INFO com.example.demo.FluxTest - subscribe:6
20:02:14.843 [parallel-7] INFO reactor.Flux.Range.1 - | onNext(7)
20:02:15.049 [parallel-8] INFO com.example.demo.FluxTest - subscribe:7

通过skip指定跳过最初一个时间段内产生的数据

take

@Test
    public void testTakeBackpressure() throws InterruptedException {
        Flux.range(1, 1000)
                .log()
                .delayElements(Duration.ofMillis(200))
                .take(Duration.ofMillis(4000))
                .subscribe(e -> {
                    LOGGER.info("subscribe:{}",e);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                });
        Thread.sleep(100*1000);
    }

输出实例

20:05:08.366 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:05:08.419 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:05:08.422 [main] INFO reactor.Flux.Range.1 - | onNext(1)
20:05:08.629 [parallel-2] INFO com.example.demo.FluxTest - subscribe:1
20:05:10.633 [parallel-2] INFO reactor.Flux.Range.1 - | onNext(2)
20:05:10.835 [parallel-3] INFO com.example.demo.FluxTest - subscribe:2
20:05:12.418 [parallel-1] INFO reactor.Flux.Range.1 - | cancel()

通过take表示只推送前面几个或前面一段时间产生的数据给订阅者

小结

reactive streams对于具有多个阶段的数据处理来说,非常有用,可以节省很多时间,另外又有backpressure来控制订阅者速度过慢的问题,非常值得使用。

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 前言 如果你对RxJava1.x还不是了解,可以参考下面文章。 1. RxJava使用介绍 【视频教程】2. Rx...
    jdsjlzx阅读 21,162评论 3 78
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,939评论 25 707
  • 已经记不起,当初说爱你的模样;开始忘了,和你相爱的那些日子。 01 阳光明媚的午后,有个久未谋面的朋友来看我。我们...
    月亮小姐6阅读 1,368评论 4 25
  • 清明的艾草青团试吃报告 采蝶轩 蛋黄肉松味 粘度不高 Q弹爽口 不油腻 甜度适中 推荐度:4星
    糖小小喵阅读 528评论 0 0