相关的maven依赖
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Reactor [riˈæktə(r)]
有两种类型:
- Flux<T>和Mono<T>。Flux可以触发0到多个事件,并根据实际情况结束或触发错误。
- Mono最多触发一个事件,所以可以把Mono<Void>用于在异步任务完成时发出通知。
反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。
在传统编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次调用者通过next()方法来获取序列的下一个值。反应试流采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。
在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。
背压(backpressure)
在基本的消息推送模式中,当消息发布者产生的数据速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的崩溃,所产生的的级联效应甚至可能操作整个系统的崩溃。背压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过request()方法来声明其一次所能够处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次request()方法的调用。这实际上变成了推拉结合的模式。
Flux和Mono
Flux和Mono是Reactor中两个基本概念。
Flux表示是包含0到N个元素的异步队列。在该序列中可以包含三种不同的类型消息通知:正常的包含元素的消息,序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法:onNext()
,onComplete()
,onError()
会被调用。
Mono表示0或者1个元素的异步队列,该队列中同样包含于Flux相同的三种类型的消息通知。Flux和Mono之间可以进行转换。对一个Flux序列进行计数操作,得到的结果是一个Mono对象。把两个Mono序列合并在一起,得到的是一个Flux对象。
2. 创建Flux
2.1 Flux的静态方法
第一种方法是通过Flux类的静态方法
just() : 指定序列中包含的所有元素。创建出来的Flux序列在发布这些元素之后会自动结束。
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
empty():创建一个不包含任何元素,只发布结束消息的序列。
error(Throwable error):创建一个只包含错误消息的序列。
never():创建一个不包含任何消息通知的序列。
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
清单1:
Flux.just("Hello", "World");
Flux.fromArray(new Integer[] {1, 2, 3});
Flux.empty();
Flux.range(1, 10);
Flux.interval(Duration.ofMillis(1000)).toStream().forEach(System.out::println);
上面这些静态方法适合简单的序列生成,当序列生成需要复杂逻辑时,则使用generate()或者create()方法。
注:清单1中5按照1000ms的间隔生成序列,toStream()方法将Flux序列转换成jdk8中的Stream对象,在通过forEach方法进行输出。这是因为序列是异步生成的,而转换成Stream对象可以保证主线程在序列生成完成之前不会退出,从而可以正确的输出所有的元素。
2.2 generate()方法
generate()方法通过同步和逐一的方式来产生Flux序列,序列的产生时通过调用所提供的SynchronousSink对象的next()、complete()和error(Throwable)方法来完成的。逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。
在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate()方法的另外一种形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。
public Flux<String> getFlux(){
//请求参数,泛型便是返回值或者操作的某个值
return Flux.generate(synchronousSink -> {
synchronousSink.next("hello");
synchronousSink.complete();
});
// Flux.generate(new Consumer<SynchronousSink<String>>() {
// @Override
// public void accept(SynchronousSink<String> synchronousSink) {
// synchronousSink.next("hello");
// synchronousSink.complete();
// }
// });
}
2.3 create()方法
create()方法和generate()方法的不同之处在于使用的是FluxSink对象。FluxSink支持同步和异步的消息产生,并且在一次调用中产生多个元素。
public Flux<String> getFluxSink() {
return Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i + " ");
fluxSink.complete();
});
}
3. 创建Mono
Mono创建方式与之前介绍的Flux类似,Mono类中也包含了与Flux类中相同的静态方法。这些方法包括just()、never()、empty()。还有一些独有的方法。
分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier()
创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
delay(Duration duration)和 delayMillis(long duration)
创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
ignoreElements(Publisher<T> source)
从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)
举例:
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
4. 操作符
Reactor强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。
4.1 buffer和bufferTimeout
把当前流中的元素收集到集合(List)中,并把集合对象作为流中新的元素。在进行收集的同时可以指定不同的条件:所包含的元素最大数量或收集的时间间隔。
方法 buffer()仅使用一个条件(集合包含对象的最大数量),而 bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数。(集合包含对象的最大数量或者是最大的时间间隔)
除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。
buffer定义元素的最大数量
@Test
public void testBuffer(){
Flux.range(1,100).buffer(20).subscribe(System.out::println);
}
输出结果:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
[61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
[81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
bufferUntil
每当遇到一个偶数就会结束当前的收集。
@Test
public void testBufferUntil() {
Flux.range(0, 10).bufferUntil(integer -> integer % 2 == 0).subscribe(System.out::println);
}
输出结果:
[0]
[1, 2]
[3, 4]
[5, 6]
[7, 8]
[9]
bufferWhile
数组里面包含的只有偶数
@Test
public void testBufferWhile() {
Flux.range(0, 10).bufferWhile(integer -> integer % 2 == 0).subscribe(System.out::println);
}
输出结果:
[0]
[2]
[4]
[6]
[8]
bufferTimeout(int maxSize, Duration maxTime)
Flux.interval(Duration.ofMillis(100))
.bufferTimeout(20,Duration.ofMillis(1000)).take(2)
.toStream().forEach(System.out::println);
每间隔100ms生成一个序列号(1,2,3...n)。集合最大长度为20,或者集合收集时间1000ms(此处是到达maxTime后便终止收集)。take(2)是获取的集合数量。
输出结果:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
4.2 filter
对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素。
代码中语句输出1到10中的所有偶数。
@Test
public void testFilter() {
Flux.range(1, 10).filter(integer -> integer % 2 == 0).subscribe(System.out::println);
}
4.3 window
window操作符的作用类似于buffer,所不同的就是window操作符是把当前流中的元素收集到另外的Flux序列中,因此返回值类型是Flux<Flux>。
@Test
public void testWindow(){
Flux.range(1,100).window(20).subscribe(System.out::println);
Flux.interval(Duration.ofMillis(100)).window(Duration.ofMillis(1001)).take(2).toStream().forEach(System.out::println);
}
两行语句的输出结果分别是 5 个和 2 个 UnicastProcessor 字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,而 UnicastProcessor 类的 toString 方法输出的就是 UnicastProcessor 字符。
4.4 zipWith
zipWith操作符把当前流中的元素与另外一个流中的元素按照1对1的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为Tuple2的流。
-
BiFunction
函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。
@Test
public void testZipWith() {
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
System.out.println();
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) ->
String.format("%s-%s", s1, s2)
).subscribe(System.out::println);
}
输出结果
[a,c]
[b,d]
a-c
b-d
4.5 take
take系统操作符用来从当前流中提取元素,提取的方式可以有很多种。
按照指定的数量或时间间隔来提取。
take(long n),take(Duration timespan)
提取流中的最后 N 个元素
takeLast(long n)
提取元素直到 Predicate 返回 true
takeUntil(Predicate<? super T> predicate)
当 Predicate 返回 true 时才进行提取
takeWhile(Predicate<? super T> continuePredicate)
提取元素直到另外一个流开始产生元素
takeUntilOther(Publisher<?> other)
takeWhile返回true时才会进行提取
@Test
public void testTake(){
Flux.range(0,100).takeWhile(integer -> integer<10).subscribe(System.out::println);
}
提取元素直到返回true
@Test
public void testTaskUntil(){
Flux.range(0,100).takeUntil(integer -> integer==10).subscribe(System.out::println);
}
4.6 reduce和reduceWith
reduce和reduceWith操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的Mono序列。累积操作是通过一个BiFunction来表示。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
对流中的元素进行相加操作,结果为5050;
@Test
public void testReduce() {
//第一个参数类型,第二个参数类型,返回结果类型
Flux.range(1, 100).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
}).subscribe(System.out::println);
//lambda表达式
Flux.range(1,100).reduce(((integer, integer2) -> {
return integer+integer2;
})).subscribe(System.out::println);
}
同样也是进行相加操作,不过通过一个Supplier给出初始值为100,所以结果为5150。
@Test
public void testReduceWith(){
Flux.range(1,100).reduceWith(new Supplier<Integer>(){
@Override
public Integer get() {
return 100;
}
},(x,y)->x+y).subscribe(System.out::println);
//lambda
Flux.range(1,100).reduceWith(()->"100",(x,y)->x+y).subscribe(System.out::println);
}
4.7 merge和mergeSequential
merge和mergeSequential操作符用来把多个流合并成一个Flux序列。不同的是merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
进行合并的流都是每隔100ms产生一个元素,不过第二个流每个元素的产生都第一个流要延迟50ms。
在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起;
@Test
public void testMerge() {
Flux.merge(Flux.interval(Duration.ofMillis(100)).take(5), Flux.interval(Duration.ofMillis(50),Duration.ofMillis(100)).take(5))
.toStream().forEach(System.out::println);
}
使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。
@Test
public void testMergeSequential(){
Flux.mergeSequential(Flux.interval(Duration.ofMillis(100)).take(5), Flux.interval(Duration.ofMillis(50),Duration.ofMillis(100)).take(5))
.toStream().forEach(System.out::println);
}
4.8 flatMap和flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。
流中的元素被转换成每隔 100 毫秒产生的数量不同的流,再进行合并。由于第一个流中包含的元素数量较少,所以在结果流中一开始是两个流的元素交织在一起,然后就只有第二个流中的元素。
@Test
public void testFlat() {
//第一个流输出5个元素,第二个流输出10个元素。
Flux.just(5, 10).flatMap(x -> Flux.interval(Duration.ofMillis(x * 10), Duration.ofMillis(100)).take(x))
.toStream().forEach(System.out::println);
}
4.9 concatMap(concatenate)
concatMap操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与flatMap不同的是,concatMap会根据原始流中的元素按顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。
流中依次包含了第一个流和第二个流中的全部元素。
@Test
public void testConcatMap() {
//第一个流输出5个元素,第二个流输出10个元素。
Flux.just(5, 10).concatMap(x -> Flux.interval(Duration.ofMillis(x * 10), Duration.ofMillis(100)).take(x))
.toStream().forEach(System.out::println);
}
4.10 combineLatest
combineLatest操作符把所有流中最新产生的元素合并为一个元素,作为返回结果流中的元素。只要其中任何一个流产生了元素,合并操作都会被执行一次。
(若是一个流未产生元素,另一个流产生元素,不会进行合并操作)。
流最新产生的元素会被收集到一个数组中,通过Array.toString方法来把数组转换为String类型。
@Test
public void testCombineLatest() {
Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.ofMillis(100)).take(5),
Flux.interval(Duration.ofMillis(1000)).take(5)
).toStream().forEach(System.out::println);
}
输出结果
[4, 0]
[4, 1]
[4, 2]
[4, 3]
[4, 4]
5. 消息处理
当需要处理Flux和Mono消息时,如之前的代码逻辑,可以通过subscribe方法来添加相应的订阅逻辑。在调用subscribe方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。
通过subscribe()方法处理正常和错误消息:
@Test
public void testMonoError(){
Flux.just(1,2).concatWith(Mono.error(new IllegalAccessError())).subscribe(System.out::println,System.out::println);
}
5.1 错误的不同处理策略
1. 使用onErrorReture()方法返回默认值。
出现错误时,会返回默认值0;
@Test
public void testOnErrorReturn(){
Flux.just(1,2).concatWith(Mono.error(new NullPointerException())).onErrorReturn(0).subscribe(System.out::println);
}
2. 使用onErrorResume根据不同的异常类型来产生不同的流
@Test
public void testOnErrorResume() {
Flux.just(1, 2)
.concatWith(Mono.error(new NullPointerException()))
.onErrorResume(e->{
if(e instanceof IllegalStateException){
return Mono.justOrEmpty(0);
}else if(e instanceof NullPointerException){
return Mono.just(1);
}
return Mono.empty();
}).subscribe(System.out::println);
}
5.2 重试机制
当出现错误时,还可以通过 retry 操作符来进行重试。重试的动作是通过重新订阅序列来实现的。
在使用 retry 操作符时可以指定重试的次数。代码中指定了重试次数为 1,所输出的结果是 1,2,1,2 和错误信息。
@Test
public void testRetry(){
Flux.just(1,2).concatWith(Mono.error(new NullPointerException()))
.retry(1).subscribe(System.out::println);
}
6. 调度器
通过调度器(Scheduler)可以指定这些操作的方式和所在的线程,有下列几种不同的调度器实现:
方法 | 描述 |
---|---|
Schedulers.immediate() | 当前线程 |
Schedulers.single() | 单一可复用线程 |
Schedulers.elastic() | 适用于IO操作,弹性线程池,线程是可以复用的,线程闲置太久,则被销毁 |
Schedulers.parallel() | 并行操作优化的线程池,线程此行数量取决于CPU数量,适合计算密集型任务 |
Schedulers.timer() | 支持任务调度 |
Schedulers.fromExecutorService() | 从已有的ExecutorService对象创建调度器 |
注:某些操作符默认就已经使用了特定类型的调度器,比如interval()方法的创建的流使用了Schedulers.timer()创建的调度器。
使用create()方法创建了一个新的Flux对象,其中包含的唯一元素就是当前线程的名字。借助是publishOn方法——切换执行时的调度器,map方法——把当前线程名称作为前缀添加,subscribeOn方法——改变Flux流订阅时的执行方式。
@Test
public void testScheduler(){
Flux.create(fluxSink -> {
fluxSink.next("Flux:"+Thread.currentThread().getName());
fluxSink.complete();
//x应该就是Flux中的元素
}).publishOn(Schedulers.single()).map(x->String.format("[%s]-%s",Thread.currentThread().getName(),x))
.publishOn(Schedulers.elastic()).map(x->String.format("[%s]-%s",Thread.currentThread().getName(),x))
.subscribeOn(Schedulers.parallel())
.toStream().forEach(System.out::println);
}
结果:
[elastic-2]-[single-1]-Flux:parallel-1
需要注意的是:
@Test
public void testScheduler(){
Flux.create(fluxSink -> {
fluxSink.next("Flux:"+Thread.currentThread().getName());
fluxSink.complete();
//x应该就是Flux中的元素
}).publishOn(Schedulers.single()).map(x->String.format("[%s]-%s",Thread.currentThread().getName(),x))
.publishOn(Schedulers.elastic()).map(x->String.format("[%s]-%s",Thread.currentThread().getName(),x))
// .subscribeOn(Schedulers.parallel())
.toStream().forEach(System.out::println);
}
输出结果:
[elastic-2]-[single-1]-Flux:main
测试
对Rector的代码进行测试时,需要加入下面依赖。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
1. 使用 StepVerifier
1.1 验证序列
验证异步序列Flux中锁包含的元素是否符合预期,StepVerifier的作用可以对序列中包含的元素进行逐一验证。
- StepVerifier.create()方法:对一个流进行包装之后再进行验证。
- expectNext()方法:用来声明测试时所期待的流中的下一个元素的值。
- verifyComplete()方法:则验证流是否正常结束。
- verifyError()方法:来验证流由于错误而终止。
@Test
public void testStepVerifier(){
StepVerifier.create(Flux.just("a","b")).expectNext("a").expectNext("b").verifyComplete();
}
1.1 虚拟时间
有些序列的生成是有时间要求的,比如1分钟才产生一个新的元素。在进行测试中,不可能花费实际的时间来等待每一个元素的生成。此时需要用到StepVerifier提供的虚拟时间功能。
需要验证的流中包含两个产生间隔为一天的元素,并且第一个元素产生的延迟时间为4个小时。在通过 StepVerifier.withVirtualTime()方法包装流之后,expectNoEvent()方法用来验证4个小时之内没有任何消息产生,然后验证第一个元素0的产生;thenAwait()方法来让虚拟时钟前进一天,然后验证第二个元素1的产生;最后验证流正常结束。
@Test
public void testVirtualTime(){
StepVerifier.withVirtualTime(()->Flux.interval(Duration.ofHours(4),Duration.ofDays(1)).take(2))
.expectSubscription().expectNoEvent(Duration.ofHours(4))
.expectNext(0L)
.thenAwait(Duration.ofDays(1))
.expectNext(1L)
.verifyComplete();
}
2. 使用TestPublisher
TestPublisher的作用在于可以控制流中元素的产生,甚至违反反应流规范的情况下。
通过 create()方法创建一个新的 TestPublisher 对象,然后使用 next()方法来产生元素,使用 complete()方法来结束流。TestPublisher 主要用来测试开发人员自己创建的操作符。
@Test
public void testTestPublisher(){
TestPublisher<String> publisher = TestPublisher.create();
publisher.next("a");
publisher.next("b");
publisher.complete();
StepVerifier.create(publisher).expectNext("a").expectNext("b").expectComplete();
}
使用检测点
通过checkpoint操作符来对特定的流处理链启用调试模式。
例:在 map 操作符之后添加了一个名为 test 的检查点。当出现错误时,检查点名称会出现在异常堆栈信息中。对于程序中复杂或重要的流处理链,可以在关键的位置上启用检测点来帮助定位问题。
@Test
public void testCheckPoint() {
Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);
}
日志记录
在开发和调试中的另外一项实用功能是把流相关的事件记录在日志中。这可以通过添加 log 操作符来实现。
添加了 log 操作符并指定了日志分类的名称。
@Test
public void testLog(){
Flux.just(1,0).log("Range").subscribe(System.out::println);
}
对应输出:
09:46:31.864 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:46:31.893 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
09:46:31.898 [main] INFO Range - | request(unbounded)
09:46:31.899 [main] INFO Range - | onNext(1)
1
09:46:31.899 [main] INFO Range - | onNext(0)
0
09:46:31.900 [main] INFO Range - | onComplete()
"冷" 与 "热" 序列
之前代码中所创建的均是冷序列:即不论订阅者在何时订阅该序列,总能收到序列中产生的全部消息。
而对应的热序列:则是持续不断的产生消息,订阅者只能获取到在其订阅之后产生的消息。
原始序列是中包含了10个间隔为1s的元素。
- publish()方法:把一个Flux对象转化为ConnectableFlux对象。
- autoConnect()方法:当ConnectionFlux对象有一个订阅者时就开始产生消息。
- source.subscribe():订阅该ConnectionFlux对象,让产生消息。
然后是当前线程睡眠5s。
第二个订阅者此时只能获取到该序列中的后5个元素。
@Test
public void testAutoConnect() throws InterruptedException {
Flux<Long> source = Flux.interval(Duration.ofMillis(1000)).take(10).publish().autoConnect();
source.subscribe();
Thread.sleep(5000);
source.toStream().forEach(System.out::println);
}
输出结果:
5
6
7
8
9