reactor3 flux 的使用

reactor 本质上是观察者模式的使用
添加数据

 List<String> list = new ArrayList<>();
 list.add("1");
 list.add("2");
 list.add("3");
 list.add("4");
 list.add("5");
// 添加数据 通过流的方式
Flux.fromStream(list.stream());
// 只包含当前的数据流 
Flux.just("1","2");
// 空的数据流
Flux.empty();
// 手动添加数据流
Flux.create((i) -> {
       i.next("1");
       i.next("2");
       i.next("3");
       i.next("4");
   });
// 添加数据通过集合的方式
Flux.fromIterable(list);
//   定时产生一个数据,当前方式是每
Flux.interval(Duration.ofMillis(100));

消费数据 subscribe方法

Flux flux =  Flux.create((i) -> {
            i.next("1");
            i.next("2");
            i.next("3");
            i.next("4");
        });
// 消费数据
flux.subscribe((s) -> {
             System.out.println(s);
   });
flux.subscribe(System.out::println)

flux.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                // System.out.println(s);
            }
        });

输出结果
16:37:50.894 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
1
2
3
4

Process finished with exit code 0


数据中间处理环节 do 系列接口

List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
// 添加数据源
Flux.fromIterable(list)
// 每处里一个数据前 就会执行该方法
.doOnNext((n) -> {
            //System.out.println("next");
        })
// 执行完数据流执行该方法
.doFinally((f) -> {
           // System.out.println("finally");
        })
// 每处里一个数据前就会执行该方法
.doOnEach((e) -> {
            //System.out.println("each");
        })
//执行结束之后输出
.doOnComplete(()->{
            System.out.println("complete");
        })
.subscribe((System.out::println));

执行结果
17:00:42.399 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
next
each
1
next
each
2
next
each
3
next
each
4
next
each
5
each
complete
finally

Process finished with exit code 0

buffer 的使用

//buffer 可以将一个长数据流切割为想要长度的数据流 用途比如需要保存大量的数据的时候可以用这个方法
//对数据切割,分段保存
Flux.interval(Duration.ofMillis(10)).buffer(10).subscribe(System.out::println);
        Thread.sleep(100000L);

输出结果
18:12:44.159 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]

flatmap, map 的区别
先看下图


image.png

返回的不一样
flatMap必须返回的是个publisher 而map 返回的是一个对象 这个对象既可以是item 也可以是封装为Flux ,Mono

        List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Flux.fromIterable(list).flatMap(s -> {
            return Mono.just(s);
        });
        Flux.fromIterable(list).flatMap(s -> {
            return Flux.just(s);
        });
        Flux.fromIterable(list).map(s -> {
            return Flux.just(s);
        });
        Flux.fromIterable(list).map(s -> {
            return Mono.just(s);
        });
        Flux.fromIterable(list).map(s -> {
            return s;
        });
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容