响应式框架reactor3的 使用其一

响应式编程(Reactive programming) 是使用异步数据流(asynchronous data streams)进行编程。
特性:

  • 异步编程: 提供了合适的异步编程模型,能够挖掘多核CPU的能力、提高效率、降低延迟和阻塞等。
  • 数据流: 基于数据流模型,响应式编程提供一套统一的Stream风格的数据处理接口。和Java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
  • 变化传播: 简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数,将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。
    其他概念:背压
    背压: Backpressure 这个概念源自工程概念中的 Backpressure:在管道运输中,气流或液流由于管道突然变细、急弯等原因导致由某处出现了下游向上游的逆向压力,这种情况称作「back pressure」。这是一个很直观的词:向后的、往回的压力。
    程序中解释: 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。需要强调的是:这句话的重点不在于「上游生产速度大于下游消费速度」,而在于「Buffer 溢出」。
    在reactor3 中的体现:
        Flux.interval(Duration.ofMillis(1)) // 每秒发送一条数据
                .log() // 打印日志
                //.limitRate(100) // 告诉上游我只能缓存100个 超过100个就会抛异常,下游每次消费100个后会告诉上游继续发送数据
                .onBackpressureBuffer(2000) // 设置背压缓存策略并设置缓存大小 当缓存超过2000 个就抛异常 这个方法会覆盖掉 limitRate
                .concatMap(x -> {
                    System.out.println(x);
                 return   Mono.delay(Duration.ofMillis(100));}) // concatMap 表示将上游的数据组装成一个flux
                .blockLast(); // 无限阻塞 知道执行完最后一条数据

上面代码回抛出异常如下:

19:57:58.204 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2015)
18
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2016)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2017)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2018)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2019)
19:57:58.222 [parallel-1] INFO reactor.Flux.Interval.1 - cancel()
Exception in thread "main" reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
    at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FluxOnBackpressureBuffer.java:170)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Flux.blockLast(Flux.java:2497)
        at FluxCreateTest.main(FluxCreateTest.java:17)

从打印的日志可以看出 concatMap 这个方法已经执行了 18条数据 ,背压缓存里缓存了2000 条数据。在2019 这里超出限制 2019-18 = 2001>2000 所以抛出异常

下面简单介绍下 reactor3 相关方法
Flux & Mono 这两个类

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种信号的特点:
  • 错误信号和完成信号都是终止信号,不能共存;
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流;
  • 如果没有错误信号,也没有完成信号,表示是无限数据流;

Flux<T> 是一个标准的 Publisher<T>,表示为发出0到N个元素的异步序列;
Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素,可以被 onComplete 或 onError 信号选择性终止;

        Mono.create(monoSink -> {
            monoSink.success(11); // 因为Mono 表示单数据 因此包含了complete() 方法
           monoSink.error(new RuntimeException("这是个异常")); // 这条数据不会执行
        })
        Flux.create(fluxSink -> {
           fluxSink.next("value"); // 元素
           fluxSink.complete();  // 完成         //个人认为 这里相当于 monoSink.Success() = next()+complete(); 
           fluxSink.error( new RuntimeException()); // 异常
        });

上面是数据的创建者下面还有写常用的数据创建者

// 将list 作为数据源添加到flux
Flux.fromIterable(List.of(1,2,3))
// 将流作为数据源
Flux.fromStream(List.of(1,2,3).stream());
// 立即创建一个数据源 饿汉模式
Flux.just(List.of());
// 对应的就有懒汉模式 这里每次调用就会返回一个
Flux.defer(()->Flux.just(1));

// 如果我想定时的创造数据流怎么办
Flux.interval(Duration.ofMillis(1)) // 每秒发送一条数据
// 当我的数据生产者不止一个怎么做
Flux.create();

Mono.

订阅者

// 阻塞的
Mono.just(1).block();
// 非阻塞的
Mono.just(1).subscribe();
// flux 非阻塞的
Flux.just(1).subscribe();
// flux 阻塞 第一条数据
Flux.just(1).blockFirst();
// 阻塞最后一条数据
Flux.just(1).blockLast();



使用flux 实现文件的读写

        Flux.fromStream(Files.lines(Paths.get("17336.txt")))
                .log()
                .subscribe(new BaseSubscriber<String>() {
                    BufferedWriter bufferedWriter = Files.newBufferedWriter(Paths.get("凡人修仙传.txt"));
                    @Override
                    @SneakyThrows
                    protected void hookOnNext(String value) {
                        super.hookOnNext(value);
                        bufferedWriter.write(value+"\n");
                        if(value.contains("------------")){ // 每写入一个章节就将数据写入文件并刷新缓存
                            bufferedWriter.flush();
                        }
                    }

                    @Override
                    @SneakyThrows
                    protected void hookOnComplete() {
                        super.hookOnComplete();
                        bufferedWriter.close();
                    }
                });

冷数据发布者VS热数据发布者

首先来看定义:
冷数据发布者: 在向订阅者发布数据的时候都会从起始位置开始,如果没有订阅者就不会做任何事情。
热数据发布者:冷数据和热数据相反,即当一个新的订阅者来订阅流的时候会在最新的位置开始发送数据。

热数据发布示例
        Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
        ConnectableFlux hotFlux = coldFlux.publish();
        hotFlux.subscribe((s)->{
            System.out.println(s);
        });
        hotFlux.connect();
        Thread.sleep(6000);
        hotFlux.subscribe(s->{
            System.out.println(s);
        });
        Thread.sleep(1000000);

执行结果

16:53:10.474 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:53:10.495 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:53:10.498 [main] INFO reactor.Flux.Interval.1 - request(256)
16:53:12.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
热数据第一个订阅者:0
16:53:12.519 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
热数据第一个订阅者:1
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
热数据第一个订阅者:2
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
热数据第一个订阅者:3
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:20.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
热数据第一个订阅者:4
热数据第二个发布者:4
16:53:20.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:22.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
热数据第一个订阅者:5
热数据第二个发布者:5
冷数据发布示例
        Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
        coldFlux.subscribe((s)->{
            System.out.println("第一个订阅者:"+s);
        });
        Thread.sleep(6000);
        coldFlux.subscribe(s->{
            System.out.println("第二个订阅者:"+s);
        });
        Thread.sleep(1000000);

执行结果

16:36:52.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:36:52.806 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:52.807 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:36:54.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
第一个订阅者:0
16:36:56.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
第一个订阅者:1
16:36:58.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
第一个订阅者:2
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:37:00.819 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
16:37:00.819 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(0)
第一个订阅者:3
第二个订阅者:0
16:37:02.811 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
第一个订阅者:4
16:37:02.826 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(1)
第二个订阅者:1
16:37:04.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
第一个订阅者:5
16:37:04.827 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(2)
第二个订阅者:2

异步与并行

讲到这里目前看到当前调用的方法基本上都是同步的,除了Flux.Interval()。但是interavl 这个方法局限性太大,那么有没有让整个流异步的方法呢?
先看代码:

        List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 161, 17, 18, 19, 20, 21, 22);
        Flux.fromStream(facebookAccountList.stream())
                .parallel(2)
                .runOn(Schedulers.parallel()) // 执行
                .log()
                .subscribe(System.out::println);
        //Thread.sleep(1_000_000);

执行结果

17:32:52.962 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:32:52.990 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.991 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)

Process finished with exit code 0

看到执行结果可以判断这段代码是异步非阻塞的,相关的日志并没有被打印出来
下面将线程休眠注释关掉 执行结果

        List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
        Flux.fromStream(facebookAccountList.stream())
                .parallel(8)
                .runOn(Schedulers.parallel())
                .log()
                .subscribe(System.out::println);
        Thread.sleep(1_000_000);

执行结果:

16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onNext(1)
1
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onNext(2)
2
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onNext(3)
3
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onNext(4)
4
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onNext(5)
5
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onNext(7)
7
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onNext(6)
6
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onNext(8)
8
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onComplete()

上述代码设计到两个方法 parallel(),runOn() ;其中parallel() 这个方法 我在看国内的文章里说这个是开启异步的方法,但是我在看国外的文章与相关的文档的时候并没有说他是开启异步的方法。而是开启并行的方法。那这里就有歧义,那这里就需要验证下;看如下代码:

        Flux.range(1,10).parallel(3).log().subscribe(System.out::println);

执行结果:

10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(1)
1
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(2)
2
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(3)
3
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(4)
4
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(5)
5
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(6)
6
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(7)
7
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(8)
8
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(9)
9
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(10)
10
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()

执行结果发现并不是异步的,那这个方法到底是干什么的?
其实文档里说的很明白了,就是并行,他的作用就是拓宽通道,本来只有一个通道的,在我使用parallel(3)方法的时候通道变为3 个, 之后订阅者订阅这三个通道。


image.png

publishOn VS subscribeOn 后续有机会再说

相关文档:
背压解释
官方文档
publishOn 和 subscribeOn

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容