3、 reactor

Reactor 简介

前面提到的 RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava 2 在 RxJava 的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。

Reactor是JVM的完全非阻塞反应式编程基础,具有高效的需求管理(以管理“背压”的形式)。它直接与Java 8功能的API,特别是整合CompletableFuture,Stream和 Duration。它提供了可组合的异步序列API Flux(用于[N]元素)和Mono(用于[0 | 1]元素),广泛地实现了Reactive Extensions规范。这段的重点是和Java8结合利用lambda表达式简洁的优点。

使用

在 Reactor 中,经常使用的类并不是很多,主要有以下两个:
Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。
Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

创建 Flux

有多种不同的方式可以创建 Flux 序列。
Flux 类的静态方法
第一种方式是通过 Flux 类中的静态方法。
just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
empty():创建一个不包含任何元素,只发布结束消息的序列。
error(Throwable error):创建一个只包含错误消息的序列。
never():创建一个不包含任何消息通知的序列。
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);

上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。

       //通过generate生成元素
        Flux.generate(sink -> {
            sink.next("Hello");
            sink.complete();
        }).subscribe(System.out::println);

        final Random random = new Random();
        Flux.generate(ArrayList::new, (list, sink) -> {
            int value = random.nextInt(100);
            list.add(value);
            sink.next(value);
            if (list.size() == 10) {
                sink.complete();
            }
            return list;
        }).subscribe(System.out::println);

        //通过create生成元素
        Flux.create(sink -> {
            for (int i = 0; i < 10; i++) {
                sink.next(i);
            }
            sink.complete();
        }).subscribe(System.out::println);

Mono 的创建

Mono 的创建方式与之前介绍的 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。除了这些方法之外,Mono 还有一些独有的静态方法。

fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
还可以通过 create()方法来使用 MonoSink 来创建 Mono。代码清单 4 中给出了创建 Mono 序列的示例。

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

操作符

filter:
对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

zipWith:
zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。

Flux.just("a", "b")
        .zipWith(Flux.just("c", "d"))
        .subscribe(System.out::println);
Flux.just("a", "b")
        .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
        .subscribe(System.out::println);

reduce 和 reduceWith:
reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

flatMap 和 flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。

Flux.just(5, 10)
        .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);

调度器

前面介绍了反应式流和在其上可以进行的各种操作,通过调度器(Scheduler)可以指定这些操作执行的方式和所在的线程。有下面几种不同的调度器实现。

当前线程,通过 Schedulers.immediate()方法来创建。
单一的可复用的线程,通过 Schedulers.single()方法来创建。
使用弹性的线程池,通过 Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时,新的线程会被创建。如果一个线程闲置太长时间,则会被销毁。该调度器适用于 I/O 操作相关的流的处理。
使用对并行操作优化的线程池,通过 Schedulers.parallel()方法来创建。其中的线程数量取决于 CPU 的核的数量。该调度器适用于计算密集型的流的处理。
使用支持任务调度的调度器,通过 Schedulers.timer()方法来创建。
从已有的 ExecutorService 对象中创建调度器,通过 Schedulers.fromExecutorService()方法来创建。

某些操作符默认就已经使用了特定类型的调度器。比如 intervalMillis()方法创建的流就使用了由 Schedulers.timer()创建的调度器。通过 publishOn()和 subscribeOn()方法可以切换执行操作的调度器。其中 publishOn()方法切换的是操作符的执行方式,而 subscribeOn()方法切换的是产生流中元素时的执行方式。
使用调度器切换操作符执行方式:

Flux.create(sink -> {
    sink.next(Thread.currentThread().getName());
    sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);

在以上代码中,使用 create()方法创建一个新的 Flux 对象,其中包含唯一的元素是当前线程的名称。接着是两对 publishOn()和 map()方法,其作用是先切换执行时的调度器,再把当前的线程名称作为前缀添加。最后通过 subscribeOn()方法来改变流产生时的执行方式。运行之后的结果是[elastic-2] [single-1] parallel-1。最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操作之前的 Schedulers.single()调度器,最外层的线程名字 elastic-2 来自第二个 map 操作之前的 Schedulers.elastic()调度器。

回压

回压的处理有以下几种策略:
IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。
ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。
DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
LATEST:让下游只得到上游最新的元素。
BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。

方法签名:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
默认(没有第二个参数的方法)是缓存策略的,我们来试一下别的策略,比如DROP的策略。

@Test
    public void OverflowStrategy() throws InterruptedException {
        Flux flux = Flux.create(sink -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("create " + i + " ---");
                sink.next(i);

            }
            sink.complete();
        }, FluxSink.OverflowStrategy.BUFFER) //1 调整不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)观察效果,create方法默认为BUFFER;
                .publishOn(Schedulers.newSingle("newSingle"), 1)   //2 使用publishOn让后续的操作符和订阅者运行在一个单独的名为newSingle的线程上,第二个参数1是预取个数
                .doOnComplete(() -> System.out.println("completed!")) // 结束时打印
                .doOnRequest(n -> System.out.println("Request " + n + " values..."));//3 打印出每次的请求

        flux.subscribe(new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) { // 4
                System.out.println("Subscribed and make a request...");
                request(1); // 5
            }

            @Override
            protected void hookOnNext(Integer value) {  // 6
                try {
                    TimeUnit.SECONDS.sleep(1);  // 7
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Get value [" + value + "]");    // 8
                request(1); // 9
            }
        });
        Thread.sleep(100 * 1000);
    }

不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)观察到的结果不同。

Hot vs Cold

到目前为止,我们讨论的发布者,无论是Flux还是Mono,都有一个特点:订阅前什么都不会发生。当我们“创建”了一个Flux的时候,我们只是“声明”/“组装”了它,但是如果不调用.subscribe来订阅它,它就不会开始发出元素。

但是我们对“数据流”(尤其是乍听到这个词的时候)会有种天然的感觉,就是无论有没有订阅者,它始终在按照自己的步伐发出数据。就像假设一个人没有一个粉丝,他也可以发微博一样。

以上这两种数据流分别称为“冷”序列和“热”序列。所以我们一直在介绍的Reactor3的发布者就属于“冷”的发布者。不过有少数的例外,比如just生成的就是一个“热”序列,它直接在组装期就拿到数据,如果之后有谁订阅它,就重新发送数据给订阅者。Reactor 中多数其他的“热”发布者是扩展自Processor 的(下节会介绍到)。

下面我们通过对比了解一下两种不同的发布者的效果,首先是我们熟悉的“冷”发布者:

@Test
public void testCodeSequence() {
    Flux<String> source = Flux.*fromIterable*(Arrays.*asList*("blue", "green", "orange", "purple"))
            .map(String::toUpperCase);

    source.subscribe(d -> System.*out*.println("Subscriber 1: " + d));
    System.*out*.println();
    source.subscribe(d -> System.*out*.println("Subscriber 2: " + d));
}

我们对发布者source进行了两次订阅,每次订阅都导致它把数据流从新发一遍:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE

Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

然后再看一个“热”发布者的例子:

@Test
public void testHotSequence() {
    UnicastProcessor<String> hotSource = UnicastProcessor.*create*();

    Flux<String> hotFlux = hotSource.publish()
            .autoConnect()
            .map(String::toUpperCase);

    hotFlux.subscribe(d -> System.*out*.println("Subscriber 1 to Hot Source: " + d));

    hotSource.onNext("blue");
    hotSource.onNext("green");

    hotFlux.subscribe(d -> System.*out*.println("Subscriber 2 to Hot Source: " + d));

    hotSource.onNext("orange");
    hotSource.onNext("purple");
    hotSource.onComplete();
}

这个热发布者是一个UnicastProcessor,我们可以使用它的onNext等方法手动发出元素。上边的例子中,hotSource发出两个元素后第二个订阅者才开始订阅,所以第二个订阅者只能收到之后的元素:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

由此可见,UnicastProcessor是一个热发布者。

另一个示例:

@Test
public void codeTest() throws InterruptedException {
    //getNameByPhoney方法执行两次
    Mono<String> customerPhoneMono = Mono.fromSupplier(
            () -> ColdMonoHttp.getNameByPhone("18611854542")
    );
    //getNameByPhoney方法执行一次
    Mono<String> customerPhoneMono = Mono.fromFuture(
            CompletableFuture.supplyAsync(() -> ColdMonoHttp.getNameByPhone("18611854542"))
    );
    customerPhoneMono.subscribe(a -> System.out.println("-----订阅一:name=" + a));
    customerPhoneMono.subscribe(a -> System.out.println("-----订阅二:name=" + a));
    Thread.sleep(10000);
}

public static String getNameByPhone(String phone) {
    try {
        System.out.println("get name start:" + phone);
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
        return null;
    }
    System.out.println("get name end:" + phone);
    return "zhangsan";
}

参考:

官网:
GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM
Reactor 3 Reference Guide
GitHub & BitBucket HTML Preview

其它:
使用 Reactor 进行反应式编程
反应式编程介绍 - liudongdong_jlu - CSDN博客
什么是 Reactor? - Reactor 指南中文版 - 极客学院Wiki
Reactor 入门与实践 - Go语言中文网 - Golang中文社区

八个层面比较 Java 8, RxJava, Reactor - 云+社区 - 腾讯云

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容

  • 反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎。在 Java 社区...
    feiniao123阅读 6,708评论 0 6
  • 适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream ...
    编走编想阅读 50,274评论 39 116
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,490评论 18 139
  • Spring5现处在第四个预发布版,正式版将要发布了,它带来的一大特性就是响应式框架Spring WebFlux。...
    MrTT阅读 26,333评论 0 21
  • 我睁开了眼睛,发现自己躺在一个房子里。看上去应该是一个实验室吧,周围的桌子上都放满了仪器和药品。我用鼻子使劲地吸了...
    连接一切阅读 197评论 0 0