学习响应式编程 Reactor (3) - reactor 基础

Reactor

Reactor 项目的主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范的响应式库。

Reactor 提供了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操作符。一个 Flux 代表 0...N 个元素的响应式流;一个 Mono 代表 0|1 个元素的响应式流。

Flux 和 Mono 之间可以转换,比如 Flux 的 count 操作(计算流中元素个数)返回 Mono,Mono 的 concatWith 操作(连接另一个响应式流)返回 Flux。

Flux

Flux<T> 是一个能够发出 0 到 N 个元素的标准 Publisher<T>,它会被一个完成(completion)或错误(error)信号终止。因此,一个 Flux 的可能结果是 value、completion
或 value,这三个分别会传递给订阅者中的 onNext、onComplete、onError 方法。

注意:所有的信号事件,包括代表终止的信号事件都是可选的。如果没有 onNext 事件,但是有 onComplete 事件,那么发出的就是空的有限流;如果去掉 onComplete
就得到一个 无限的空数据流。无限的数据流可以不是空的,比如 Flux.interval(Duration) 生成的是一个 Flux<Long>,这是一个无限周期性发出规律整数的时钟数据流。

下图展示的是 Flux 基于时间线的弹珠交互图,通过操作符转换 Flux 中元素:

04_reactor_flux_transform.png
  • 上面那条线表示的是 Flux 数据流时间线,时间从左至右
  • 上面那条线中的弹珠代表示的是 Flux 发出的 数据元素
  • 上面那条线最后的垂直线表示的是 Flux 已经完成成功事件
  • 中间的箭头虚线和框表示的是 Flux 中的元素正在被转换,框内的文字表示的是转换的方式(包含操作符)
  • 下面那条线表示的是 FLux 经过转换后的新数据流
  • 如果由于某种原因导致 Flux 的转换终止,将使用 X 来代替 垂直线

后续在学习操作符的过程中,我们将见到很多类似的弹珠图,请大家详细了解清楚该图各部分的含义。

Mono

Mono<T> 是一种特殊的Publisher<T>,它最多只能发出一个元素,然后(可选的)终止于 onComplete 或 onError 信号。

Mono 中的操作符是 Flux 中操作符的子集,即 Flux 中只有部分操作符适用于 Mono,有些操作符是将 Mono 和另一个 Publisher 连接转换为 Flux。例如,Mono#concatWith(Publisher
) 转换为 Flux,Mono#then(Mono) 返回另一个 Mono。

注意:可以使用 Mono<Void> 来创建一个只有完成概念的空值异步处理过程(类似于 Runnable)。

下图展示的是 Mono 基于时间线的弹珠交互图:

05_reactor_mono_transform.png

创建 Flux 和 Mono

如同创建 Java Stream 一样,Reactor 也为我们提供了 多个工厂方法用来创建 Flux 和 Mono,有了 Stream 的基础,创建的基本方法我们来快速过一下。

下面的创建方法,如果是 Flux 或 Mono 独有的,会在方法名前增加类名前缀。

下面的示例代码中都有用到 subscribe 方法,下面会讲到,大家先了解它是响应式流的订阅方法,用于触发流,类似于 Java Stream 中的终端操作。

just

使用提供的元素发出数据然后结束的流。

06_reactor_flux_just.png
Mono.just("hello, world").subscribe(System.out::println);
Mono.justOrEmpty(str).subscribe(System.out::println);
Mono.justOrEmpty(optional).subscribe(System.out::println);

Flux.just("hello", "world").subscribe(System.out::println);
Flux.just("hello").subscribe(System.out::println);

Flux#fromXxx

Flux 提供了 fromArray(从数组)、fromIterable(从迭代器)、fromStream(从 Java Stream 流) 的方式来创建 Flux。

String[] array = new String[]{"hello", "reactor", "flux"};
List<String> iterable = Arrays.asList("foo", "bar", "foobar");

Flux.fromArray(array).subscribe(System.out::println);
Flux.fromIterable(iterable).subscribe(System.out::println);
Flux.fromStream(Arrays.stream(array)).subscribe(System.out::println);

Flux#range

从 start 开始构建一个 Flux,该 Flux 仅发出一系列递增计数的整数。 也就是说,在 start(包括)和 start + count(排除)之间发出整数,然后完成。见图识意:

07_reactor_flux_range.png
Flux.range(3, 5).subscribe(System.out::println);

Flux#interval

在全局计时器上创建一个 Flux,该 Flux 在初始延迟后,发出从0开始并以指定的时间间隔递增的长整数。 如果未及时产生,则会通过溢出 IllegalStateException 发出 onError
信号,详细说明无法发出的原因。 在正常情况下,Flux 将永远不会完成。interval 提供了 3 个重载方法,三者的区别主要在于是否延迟发出、以及使用的调度器。

interval 生成的是一个无限数据流。

Flux<Long> interval(Duration period)
Flux<Long> interval(Duration delay, Duration period)
Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
  • 第 1 个方法,没有延迟,按照 period 的周期立即发出,默认使用 Schedulers.parallel() 调度器
  • 第 2 个方法,以 delay 延迟,按照 period 的周期发出,默认使用 Schedulers.parallel() 调度器
  • 第 3 个方法,以 delay 延迟,按照 period 的周期发出,使用指定的调度器

见图识意:

10_reactor_flux_interval.png
Flux.interval(Duration.ofMillis(30), Duration.ofMillis(500)).subscribe(System.out::println);

empty

生成一个空的有限流。见图识意:

08_reactor_mono_empty.png
Flux.empty().subscribe(System.out::println, System.out::println, () -> System.out.println("结束"));

never

生成一个空的无限流。见图识意:

09_reactor_flux_never.png
Flux.never().subscribe(System.out::println, System.out::println, () -> System.out.println("结束"));

error

生成一个错误流。error 有 3 个重载方法,它们的主要区别是否立即生成错误及是否由 Supplier 提供,见图识意:

13_reactor_flux_error.png
Flux.error(new IllegalStateException(), true)
    .log()
    .subscribe(System.out::println, System.err::println);

其它

Flux 和 Mono 还提供了编程式的创建数据流的方法,诸如 create、generate、push、handle 等的方式,这些内容暂时不是我们的重点,这里我们不细展开,感兴趣的可看 Api 进行研究下。

11_reactor_flux_generate.png

订阅 Flux 和 Mono

在上面创建 Flux 和 Mono 笔记的示例代码中,我们已经提到了 subscribe 订阅,在 subscribe 订阅中,Flux 和 Mono 支持 Java 8 Lambda 表达式。下面我们来看看 Reactor
为我们提供了哪些订阅方法。

subscribe(); // ①

subscribe(Consumer<? super T> consumer);  // ②

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); // ③

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); // ④

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); // ⑤

subscribe(Subscriber<? super T> actual); // ⑥
  1. 序号① 订阅并触发响应式流。
  2. 序号② 对每个生成的元素进行消费。
  3. 序号③ 对正常元素进行消费,对错误进行响应处理。
  4. 序号④ 对正常元素和错误均有响应,还定义了响应流正常完成后的回调。
  5. 序号⑤ 对正常元素、错误信号和完成信号均有响应,同时也定义了 对该 subscribe 返回的 Subscription 的回调处理。
  6. 序号⑥ 通过自定义实现 Subscriber 接口来订阅。

注意:序号⑤ 变量传递一个 Subscription 的引用,如果不再需要更多元素时,可以通过它来取消订阅。取消订阅时,源头会停止生成数据,并清理相关的资源。取消和清理的操作是在 Disposable 接口中定义的。

来看下序号 ⑤ 的 subscribe 的弹珠图:

12_reactor_flux_subscribe.png
Flux.range(1, 4)
        .subscribe(System.out::println,
                error -> System.err.println("发生错误:" + error),
                () -> System.out.println("完成"),
                sub -> {
                    System.out.println("已订阅");
                    // 理解背压
                    // 尝试修改下 request 中的值,看看有啥变化
                    sub.request(10);
                });

注意:序号⑥ 的方式支持背压等操作,不在我们本次笔记的范畴,我们还是先略过,后期在学习。

补充

在上节我们讲解 Reactor 调试部分时,遗漏了记录数据流的日志方法,再此做下补充:除了基于 stack trace 的方式调试分析,我们还可以使用 log
操作符,来跟踪响应式流并记录日志。将它添加到操作链上之后,它会读取每一个再其上游的 Flux 和 Mono 事件(包括 onNext、onError、onComplete、Subscribe、Cancel 和 Request)。

// 尝试交换下 take 和 log 的顺序,看看有啥变化
Flux.range(1, 10)
        // .log()
        .take(3)
        .log()
        .subscribe();

总结

本篇我们介绍了 Reactor 的基础知识:先是了解了 Reactor 为我们提供的响应式流类 Flux 和 Mono,之后学习了如何创建他们和订阅他们,因为有之前 Stream
的基础,想来大家对这些知识点都好理解和接受。

今天的内容就学到这里,我们下篇开始学习 Reactor 的操作符。

源码详见:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模块下 ReactorBasicLearningTest 测试类。

参考

  1. Reactor 3 Reference Guide
  2. Reactor 3 中文指南
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352