reactor3 flux 多个订阅者

在一个数据源里想要有多个订阅者消费时应该怎么做呢?

        List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        Flux<String> flux = Flux.fromIterable(list);
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);
        flux.subscribe(System.out::println);

那么如果我想要有三个个订阅者的时候才开始消费数据源该如何做呢

        Flux<String> flux = Flux.fromIterable(list);
        ConnectableFlux<String> con = flux.publish();
        con.subscribe(System.out::println);
        con.subscribe(System.out::println);
        con.subscribe(System.out::println);
        // 手动的开启消费数据
        con.connect();

如果感觉手动开启太麻烦也可以这样

    // autoConnect(3) 表示如果订阅者达到三个 就自动开启
        Flux<String> auto = flux.publish().autoConnect(3);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);

如果感觉还不够好的,比如当有一个订阅者突然断开了,我想停止消费数据该怎么做呢

      // 如果订阅者少于三个就会停止消费数据,直到订阅者达到三个为止
        Flux<String> auto = flux.publish().refCount(3);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);

      // 如果订阅者少于三个且超过十秒没有新的订阅才会停止消费数据
        Flux<String> auto = flux.publish().refCount(3,Duration.ofSeconds(10));
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);
        auto.subscribe(System.out::println);

        Thread.sleep(1000L);
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容