作者:
瑾年
微信公众号: Java知音
本章节介绍:Flux和Mono操作符
和其他主流的响应式编程一样,Reactor框架的设计目标也是为了简化相应式流的使用方法。为此Reactor框架提供了大量操作符用于操作Flux和Mono对象。
本节不打算全面详细介绍,我们的思路是将这些操作符分类,然后对每一类中具有代表性的操作符展开讨论。
对于其他没有介绍到的操作符可参考Reactor框架的官方文档。
在本节中我们把Flux和Mono操作符分为以下7大类。
- 转换 (Transforming)操作符负责对序列中的元素进行转变。
- 过滤 (Filtering)操作符负责将不需要的数据从序列中进行过滤。
- 组合 (Combining) 操作符负责将序列中的元素进行合并和连接。
- 条件 (Conditional) 操作符负责根据特定条件对序列中的元素进行处理。
- 数学 (Mathematical) 操作符负责对序列中的元素执行各种数学操作。
- Obserable工具(Utility) 操作符提供的是一些针对流失处理的辅助性工具。
- 日志和调试(Log&Debug) 操作符提供了针对运行时日志以及如何对序列进行代码调试的工具类。
1. 转换操作符
Reactor框架中常用的转换操作符包括buffer、map、flatMap和window。
(1)buffer
buffer操作符把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。使用buffer操作符在进行元素收集时可以指定集合对象所包含的元素的最大数量。
以下代码先使用range()方法创建了1~50这50个元素,然后演示了使用buffer从包含这50个元素的流中构建集合,每个集合包含10个元素,一共构建5个集合。
Flux.range(1,50).buffer(10).subscribe(System.out::println);
上面代码执行结果如下:
[1,2,3,4,5,6,7,8,9,10]
[11,12,13,14,15,16,17,18,19,20]
[21,22,23,24,25,26,27,28,29,30]
[31,32,33,34,35,36,37,38,39,40]
[41,42,43,44,45,46,47,48,49,50]
buffer操作符的另一种用法是指定收集的时间间隔,由此演变出了bufferTimeout()方法。bufferTimeout()方法可以指定时间间隔为一个Duration对象或者毫秒数,即使用bufferTimeoutMillis()或者bufferMillis()这两个方法。
除了指定元素数量和时间间隔,还可以通过bufferUnitl和bufferWhile操作符进行数据收集。bufferUnitl会一直收集,知道断言(Predicate)条件返回true。使得断言条件返回true的那个元素可以选择添加到当前集合或者下一个集合当中。
而bufferWhile只有当断言条件返回true时才会收集,一旦值为false,会立即开始下一次收集。
代码如下:
Flux.range(1,10).bufferUnitl(i -> i%2 ==0).subscribe(System.out::println);
System.out.println("-----------------------------------");
Flux.range(1,10).bufferWhile(i -> i%2 ==0).subscribe(System.out::println);
以上代码执行结果如下:
[1,2]
[3,4]
[5.6]
[7,8]
[9,10] -------------------------- [2]
[4]
[6]
[8]
[10]
(2)map
map操作符相当于一种映射操作,他对流中每一个元素映射一个函数,从而达到一个变幻效果。
Flux.just(1, 2, 3, 4)
.log()
.map(i -> { try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} return i * 2;
})
.subscribe(e -> log.info("get:{}",e));
以上代码运行结果:
10:53:57.058 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
10:53:57.062 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
10:53:57.063 [main] INFO reactor.Flux.Array.1 - | onNext(1) 10:53:58.067 [main] INFO com.example.demo.FluxTest - get:2
10:53:58.067 [main] INFO reactor.Flux.Array.1 - | onNext(2) 10:53:59.071 [main] INFO com.example.demo.FluxTest - get:4
10:53:59.071 [main] INFO reactor.Flux.Array.1 - | onNext(3) 10:54:00.076 [main] INFO com.example.demo.FluxTest - get:6
10:54:00.076 [main] INFO reactor.Flux.Array.1 - | onNext(4) 10:54:01.080 [main] INFO com.example.demo.FluxTest - get:8
10:54:01.081 [main] INFO reactor.Flux.Array.1 - | onComplete()
(3) flatMap
与map不同,操作符把流中的每一个元素转换成一个流,再把转换之后得到的所有流中的元素进行合并。
flatMap操作符非常实用,代码示例如下:
Flux.just(1,5).flatMap(x -> Mono.just(x*x)).subscribe(System.out::println);
以上代码中我们对1和5这两个元素进行了flatMap操作,操作的结果是返回他们的平方值进行合并,执行结果如下:
1
25
在系统开发过程中我们经常会碰到对数据库中的数据进行逐一处理的场景,这时候可以充分利用flatMap操作符的特性开展相关操作。以下代码展示了如何使用flatMap对数据库中获取的数据进行逐一删除的方法。
Mono<void> deleteFiles = fileRepository.findByname(flieName).flatMap(fileRepository::delete);
(4)window
window操作符类似于buffer,所不同的是,window操作符是把当前流中的元素收集到另一个Flux序列中。因此它的返回值类型是Flux<flux>,而不是简单的Flux。
示例代码:
Flux.range(1,5).window(2).toIterable().forEach(w -> {
w.subscribe(System.out::println);
System.out.println("---------------------------")
})
以上代码执行结果如下。这里生成了5个元素,然后通过window操作符把这个5个元素转变成3个Flux对象,并通过forEach()工具把这些对象打印出来。
1
2
-----------------
3
4
-----------------
5
2.过滤操作符
Reactor中 常用过操作符包括filter、first、last、skip/skipLast、take/takeLast等。
(1)filter
filter操作符的含义与普通的过滤器类似,就是对流中包含的元素进行过滤,只是留下满足指定条件的元素。
例如,我们想对1~10这10个元素进行过滤,只获取能被2取余的元素,可以使用如下代码。
Flux.range(1,10).filter(i -> i % 2 ==0).subscribe(System.out::println);
(2)first
first操作符的执行效果即为返回流中的第一个元素。
(3)last
last操作符与first类似,返回流中的最后一个元素。
(4)skip/skipLast
如果使用skip操作符,将会忽略数据流中的前n个元素。类似的如果使用skipLast将会忽略数据流中的后n个元素。
(5)take/takeLast
take系列操作符用来从当前流中提取元素。我们可以按照指定的数量来提取元素,对应的方法是take(long n);同时,也可以按照指定的时间间隔来提取元素,分别使用take(Duration time) 和takeMillis(long time)。类似的takeLast操作符用来从当前流中尾部提取元素。
take和takeLast操作符示例代码如下:
Flux.range(1,100).take(10).subscribe(System.out::println);
Flux.range(1,100).takeLast(10).subscribe(System.out::println);
3. 组合操作符
Reactor中常用的组合操作符有then/when、merge、starWith和zip
(1)then/when
then操作符的含义是等到上一个操作符完成时在做下一个。
when操作符的含义是等到多个操作仪器完成。
如下代码很好的展示了when操作符的实际应用场景。
public Mono<Void> updateFiles(Flux<FilePart> files){ return files.flatMap(file ->{
Mono<Void> copyFileToFileServer =...;
Mono<Void> saFilePathToDataBase = ...; return Mono.when(copyFileToFileServer);
});
}
(2)starWith
starWith操作符的含义是在数据元素序列的开头插入指定的元素项。
(3)merge
merge操作符用来把多个流合并成一个Flux序列,该操作符按照所有流中的元素实际生产顺序合并。
merge操作符示例代码如下:
Flux.merge(Flux.intervalMillis(0,10).take(3),
Flux.intervalMillis(5,10).take(3)).toStream()
.forEach(System.out::println);
请注意这里两个Flux.intervalMillis()方法都是在限制10ms内生产一个新元素。
运行结果如下:
0
0
1
1
2
2
不同于merge,mergeSequeetial操作符则是按照所有流被订阅的顺序以流为单位进行合并。
请看如下代码:
Flux.mergeSequeetial(Flux.intervalMillis(0,10).take(3),
Flux.intervalMillis(5,10).take(3)).toStream()
.forEach(System.out::println);
我们仅仅只是将merge换成了mergeSequeetial。
运行结果如下:
0
1
2
0
1
2
(4)zipWith
zipWith把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。使用zipWith
操作符在合并时可以不做任何处理,如此得到一个元素类型为Tuple2的流,示例代码如下:
Flux.just("a","b").zipWith(Flux.just("c","b")).subscribe(System.out::println);
运行结果如下:
[a,c]
[b,d]
另外,我们还可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。
代码如下:
Flux.just("a","b").zipWith(Flux.just("c","b"),(s1,s2) -> String.format("%+%",s1,s2))
.subscribe(System.out::println);
运行结果如下:
a+c
b+d