响应式编程(Reactive programming) 是使用异步数据流(asynchronous data streams)进行编程。
特性:
- 异步编程: 提供了合适的异步编程模型,能够挖掘多核CPU的能力、提高效率、降低延迟和阻塞等。
- 数据流: 基于数据流模型,响应式编程提供一套统一的Stream风格的数据处理接口。和Java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
- 变化传播: 简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数,将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。
其他概念:背压
背压: Backpressure 这个概念源自工程概念中的 Backpressure:在管道运输中,气流或液流由于管道突然变细、急弯等原因导致由某处出现了下游向上游的逆向压力,这种情况称作「back pressure」。这是一个很直观的词:向后的、往回的压力。
程序中解释: 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。需要强调的是:这句话的重点不在于「上游生产速度大于下游消费速度」,而在于「Buffer 溢出」。
在reactor3 中的体现:
Flux.interval(Duration.ofMillis(1)) // 每秒发送一条数据
.log() // 打印日志
//.limitRate(100) // 告诉上游我只能缓存100个 超过100个就会抛异常,下游每次消费100个后会告诉上游继续发送数据
.onBackpressureBuffer(2000) // 设置背压缓存策略并设置缓存大小 当缓存超过2000 个就抛异常 这个方法会覆盖掉 limitRate
.concatMap(x -> {
System.out.println(x);
return Mono.delay(Duration.ofMillis(100));}) // concatMap 表示将上游的数据组装成一个flux
.blockLast(); // 无限阻塞 知道执行完最后一条数据
上面代码回抛出异常如下:
19:57:58.204 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2015)
18
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2016)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2017)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2018)
19:57:58.219 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2019)
19:57:58.222 [parallel-1] INFO reactor.Flux.Interval.1 - cancel()
Exception in thread "main" reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
at reactor.core.publisher.FluxOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FluxOnBackpressureBuffer.java:170)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2497)
at FluxCreateTest.main(FluxCreateTest.java:17)
从打印的日志可以看出 concatMap 这个方法已经执行了 18条数据 ,背压缓存里缓存了2000 条数据。在2019 这里超出限制 2019-18 = 2001>2000 所以抛出异常
下面简单介绍下 reactor3 相关方法
Flux & Mono 这两个类
Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
三种信号的特点:
- 错误信号和完成信号都是终止信号,不能共存;
- 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流;
- 如果没有错误信号,也没有完成信号,表示是无限数据流;
Flux<T> 是一个标准的 Publisher<T>,表示为发出0到N个元素的异步序列;
Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素,可以被 onComplete 或 onError 信号选择性终止;
Mono.create(monoSink -> {
monoSink.success(11); // 因为Mono 表示单数据 因此包含了complete() 方法
monoSink.error(new RuntimeException("这是个异常")); // 这条数据不会执行
})
Flux.create(fluxSink -> {
fluxSink.next("value"); // 元素
fluxSink.complete(); // 完成 //个人认为 这里相当于 monoSink.Success() = next()+complete();
fluxSink.error( new RuntimeException()); // 异常
});
上面是数据的创建者下面还有写常用的数据创建者
// 将list 作为数据源添加到flux
Flux.fromIterable(List.of(1,2,3))
// 将流作为数据源
Flux.fromStream(List.of(1,2,3).stream());
// 立即创建一个数据源 饿汉模式
Flux.just(List.of());
// 对应的就有懒汉模式 这里每次调用就会返回一个
Flux.defer(()->Flux.just(1));
// 如果我想定时的创造数据流怎么办
Flux.interval(Duration.ofMillis(1)) // 每秒发送一条数据
// 当我的数据生产者不止一个怎么做
Flux.create();
Mono.
订阅者
// 阻塞的
Mono.just(1).block();
// 非阻塞的
Mono.just(1).subscribe();
// flux 非阻塞的
Flux.just(1).subscribe();
// flux 阻塞 第一条数据
Flux.just(1).blockFirst();
// 阻塞最后一条数据
Flux.just(1).blockLast();
使用flux 实现文件的读写
Flux.fromStream(Files.lines(Paths.get("17336.txt")))
.log()
.subscribe(new BaseSubscriber<String>() {
BufferedWriter bufferedWriter = Files.newBufferedWriter(Paths.get("凡人修仙传.txt"));
@Override
@SneakyThrows
protected void hookOnNext(String value) {
super.hookOnNext(value);
bufferedWriter.write(value+"\n");
if(value.contains("------------")){ // 每写入一个章节就将数据写入文件并刷新缓存
bufferedWriter.flush();
}
}
@Override
@SneakyThrows
protected void hookOnComplete() {
super.hookOnComplete();
bufferedWriter.close();
}
});
冷数据发布者VS热数据发布者
首先来看定义:
冷数据发布者: 在向订阅者发布数据的时候都会从起始位置开始,如果没有订阅者就不会做任何事情。
热数据发布者:冷数据和热数据相反,即当一个新的订阅者来订阅流的时候会在最新的位置开始发送数据。
热数据发布示例
Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
ConnectableFlux hotFlux = coldFlux.publish();
hotFlux.subscribe((s)->{
System.out.println(s);
});
hotFlux.connect();
Thread.sleep(6000);
hotFlux.subscribe(s->{
System.out.println(s);
});
Thread.sleep(1000000);
执行结果
16:53:10.474 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:53:10.495 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:53:10.498 [main] INFO reactor.Flux.Interval.1 - request(256)
16:53:12.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
热数据第一个订阅者:0
16:53:12.519 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
热数据第一个订阅者:1
16:53:14.504 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
热数据第一个订阅者:2
16:53:16.500 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
热数据第一个订阅者:3
16:53:18.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:20.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
热数据第一个订阅者:4
热数据第二个发布者:4
16:53:20.509 [parallel-1] INFO reactor.Flux.Interval.1 - request(1)
16:53:22.508 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
热数据第一个订阅者:5
热数据第二个发布者:5
冷数据发布示例
Flux coldFlux = Flux.interval(Duration.ofSeconds(2)).log();
coldFlux.subscribe((s)->{
System.out.println("第一个订阅者:"+s);
});
Thread.sleep(6000);
coldFlux.subscribe(s->{
System.out.println("第二个订阅者:"+s);
});
Thread.sleep(1000000);
执行结果
16:36:52.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:36:52.806 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:52.807 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:36:54.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(0)
第一个订阅者:0
16:36:56.813 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(1)
第一个订阅者:1
16:36:58.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(2)
第一个订阅者:2
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - onSubscribe(FluxInterval.IntervalRunnable)
16:36:58.813 [main] INFO reactor.Flux.Interval.1 - request(unbounded)
16:37:00.819 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(3)
16:37:00.819 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(0)
第一个订阅者:3
第二个订阅者:0
16:37:02.811 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(4)
第一个订阅者:4
16:37:02.826 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(1)
第二个订阅者:1
16:37:04.812 [parallel-1] INFO reactor.Flux.Interval.1 - onNext(5)
第一个订阅者:5
16:37:04.827 [parallel-2] INFO reactor.Flux.Interval.1 - onNext(2)
第二个订阅者:2
异步与并行
讲到这里目前看到当前调用的方法基本上都是同步的,除了Flux.Interval()。但是interavl 这个方法局限性太大,那么有没有让整个流异步的方法呢?
先看代码:
List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 161, 17, 18, 19, 20, 21, 22);
Flux.fromStream(facebookAccountList.stream())
.parallel(2)
.runOn(Schedulers.parallel()) // 执行
.log()
.subscribe(System.out::println);
//Thread.sleep(1_000_000);
执行结果
17:32:52.962 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:32:52.990 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.991 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
17:32:52.993 [main] INFO reactor.Parallel.RunOn.1 - request(unbounded)
Process finished with exit code 0
看到执行结果可以判断这段代码是异步非阻塞的,相关的日志并没有被打印出来
下面将线程休眠注释关掉 执行结果
List<Integer> facebookAccountList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
Flux.fromStream(facebookAccountList.stream())
.parallel(8)
.runOn(Schedulers.parallel())
.log()
.subscribe(System.out::println);
Thread.sleep(1_000_000);
执行结果:
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onNext(1)
1
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onNext(2)
2
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onNext(3)
3
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onNext(4)
4
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onNext(5)
5
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onNext(7)
7
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onNext(6)
6
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onNext(8)
8
16:36:22.105 [parallel-6] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-5] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-8] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-1] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-7] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-2] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-4] INFO reactor.Parallel.RunOn.1 - onComplete()
16:36:22.105 [parallel-3] INFO reactor.Parallel.RunOn.1 - onComplete()
上述代码设计到两个方法 parallel(),runOn() ;其中parallel() 这个方法 我在看国内的文章里说这个是开启异步的方法,但是我在看国外的文章与相关的文档的时候并没有说他是开启异步的方法。而是开启并行的方法。那这里就有歧义,那这里就需要验证下;看如下代码:
Flux.range(1,10).parallel(3).log().subscribe(System.out::println);
执行结果:
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onSubscribe(ParallelSource.ParallelSourceMain.ParallelSourceInner)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - request(unbounded)
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(1)
1
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(2)
2
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(3)
3
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(4)
4
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(5)
5
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(6)
6
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(7)
7
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(8)
8
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(9)
9
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onNext(10)
10
10:04:16.774 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
10:04:16.789 [main] INFO reactor.Parallel.Source.1 - onComplete()
执行结果发现并不是异步的,那这个方法到底是干什么的?
其实文档里说的很明白了,就是并行,他的作用就是拓宽通道,本来只有一个通道的,在我使用parallel(3)方法的时候通道变为3 个, 之后订阅者订阅这三个通道。
publishOn VS subscribeOn 后续有机会再说
相关文档:
背压解释
官方文档
publishOn 和 subscribeOn