reactor 本质上是观察者模式的使用
添加数据
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
// 添加数据 通过流的方式
Flux.fromStream(list.stream());
// 只包含当前的数据流
Flux.just("1","2");
// 空的数据流
Flux.empty();
// 手动添加数据流
Flux.create((i) -> {
i.next("1");
i.next("2");
i.next("3");
i.next("4");
});
// 添加数据通过集合的方式
Flux.fromIterable(list);
// 定时产生一个数据,当前方式是每
Flux.interval(Duration.ofMillis(100));
消费数据 subscribe方法
Flux flux = Flux.create((i) -> {
i.next("1");
i.next("2");
i.next("3");
i.next("4");
});
// 消费数据
flux.subscribe((s) -> {
System.out.println(s);
});
flux.subscribe(System.out::println)
flux.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
// System.out.println(s);
}
});
输出结果
16:37:50.894 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
1
2
3
4
Process finished with exit code 0
数据中间处理环节 do 系列接口
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
// 添加数据源
Flux.fromIterable(list)
// 每处里一个数据前 就会执行该方法
.doOnNext((n) -> {
//System.out.println("next");
})
// 执行完数据流执行该方法
.doFinally((f) -> {
// System.out.println("finally");
})
// 每处里一个数据前就会执行该方法
.doOnEach((e) -> {
//System.out.println("each");
})
//执行结束之后输出
.doOnComplete(()->{
System.out.println("complete");
})
.subscribe((System.out::println));
执行结果
17:00:42.399 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
next
each
1
next
each
2
next
each
3
next
each
4
next
each
5
each
complete
finally
Process finished with exit code 0
buffer 的使用
//buffer 可以将一个长数据流切割为想要长度的数据流 用途比如需要保存大量的数据的时候可以用这个方法
//对数据切割,分段保存
Flux.interval(Duration.ofMillis(10)).buffer(10).subscribe(System.out::println);
Thread.sleep(100000L);
输出结果
18:12:44.159 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
flatmap, map 的区别
先看下图
image.png
返回的不一样
flatMap必须返回的是个publisher 而map 返回的是一个对象 这个对象既可以是item 也可以是封装为Flux ,Mono
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
Flux.fromIterable(list).flatMap(s -> {
return Mono.just(s);
});
Flux.fromIterable(list).flatMap(s -> {
return Flux.just(s);
});
Flux.fromIterable(list).map(s -> {
return Flux.just(s);
});
Flux.fromIterable(list).map(s -> {
return Mono.just(s);
});
Flux.fromIterable(list).map(s -> {
return s;
});