声明:
一、Reactor的概念及引入
Reactor
是一个响应式的编程框架。什么是响应式编程呢,就我个人理解,简单来说,就是异步、事件响应、回调。编程中,经常会有许多这样的场景,比如需要等待某个耗时操作的响应,然后根据响应执行接下来的步骤,而在响应式编程中,对付这样的应用场景,我们选择将这些耗时操作丢到其他线程中去完成,而我们只需要给其一个回调,使得在操作完成时,能够触发回调,执行我们想要的步骤。说到这,你也许会说,Java中不是已经有Future
和Callbacks
来实现该功能吗,为何又需要Reactor
?那就先来说说Future
和Callbacks
的缺点吧。
Callbacks
相信不用我多说,Js中至少用的不少,其最明显的缺点就是“回调地狱”。
Future
则有点类似Js中的Promise
,异步操作会直接返回一个包装对象,该对象并不代表结果,但是可以通过它去获知异步操作是否已经完成并且是否可以获知结果等等。它的缺点在于,在操作合并任务的时候非常繁琐,因为合并任务本身也会返回一个Future
,实际上,对于某些任务的状态我们根本都不想去知道,我们希望看到的只是结果,这也是为什么Reactor
更为优秀的原因。
Reactor
有点像加强版的Stream
,只不过其更侧重于响应式编程方面。我们可以发现,一个事件完成时 => 下一个任务 => 下一个任务完成时 => 下下一个任务,这样一系列操作完全可以转换成为一个流,这也是Reactor
的特点所在。
二、Reactor的引入
注意:当前版本的Reactor
需要运行在JDK8
以上
以下是maven的依赖:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Bismuth-RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
三、简单使用下Reactor
- 代码:
Flux<String> stringFlux = Flux.just("str1" , "str2" , "str3")
.map(item -> {
if(item.equals("str3")) throw new RuntimeException("test ex");
LockSupport.parkNanos(Duration.ofSeconds(2).toNanos());
return item;
});
stringFlux.subscribe(System.out::println , System.out::println , () -> System.out.println("done") , sub -> sub.request(3));
System.out.println("\n**********************************\n");
//stream
Stream<String> stringStream = Stream.of("str1" , "str2" , "str3")
.map(item -> {
if(item.equals("str3")) throw new RuntimeException("test ex");
return item;
});
stringStream.forEach(System.out::println);
代码分为两部分,上面那部分是使用
Reactor
操作数据源的结果,下面是使用Stream
操作数据源的结果。执行内容几乎相同,都是打印出前"str1"和"str2",在"str3"时抛出异常
- 执行结果:
str1
str2
java.lang.RuntimeException: test ex
**********************************
str1
str2
Exception in thread "main" java.lang.RuntimeException: test ex
看结果,你会发出疑问,说好的
Reactor
是异步操作呢?不用急,只是由于没有将其配置在别的线程上工作罢了,所以在此默认是使用subscribe
所在的线程,即主线程来执行任务的。类似于Stream
中的结束操作,在Reactor
中,流的激活是在subscribe
方法被调用的那一刻开始的。
参考文档:
[1] Reactor api doc
[2] Reactor reference doc