1、概要
Spring 在5.0版本引入WebFlux,增加响应式框架的支持,响应式编程是一种范式,它促进了数据处理的异步、非阻塞、事件驱动方法。那么到底什么是Reactive
呢?它就像是初恋,让人想要知道关于她的一切。
reactive
是指围绕对变化作出反映的编程模型,网络组建对I/O事件作出反映,UI控制器对鼠标事件作出反映,等等。从这个意义上说非阻塞(non-blocking
)就是reactive
。还有另外一个重要的机制,Spring将reacive
和非阻塞背压(non-blocking back pressure
)联系在一起。在同步的代码中,阻塞调用就是一种天然的背压的一种形式,它迫使调用者必须等待。在非阻塞的代码中,控制事件的速率就变的很重要,这样快速的事件生产者就不会压垮消费者。
2、Spring WebFlux Framework
Spring WebFlux是同Spring MVC同级别且完简单全支持响应流的web框架。“她”框架支持两种编程模式
-
Annotated Controllers
基于注解的响应式组件 -
Functional Endpoints
基于lambada轻量级函数式编程模型
下面我们就聚焦在第一种,基于注解响应式组件,如果你熟悉Spring MVC编程风格,那么你也能轻松使用WebFlux,下面我们温柔的扒一扒WebFlux
3、Reactive REST 应用
我们现在使用Spring WebFlux 创建一个简单的Reactive REST FuckingGreatWebfluxApplication
应用:
3.1 项目结构
.
├── pom.xml
└── src
├── main
│ ├── java
│ │ └── com
│ │ └── example# 错过了初恋,别错过WebFlux
│ │ └── webflux
│ │ ├── controller
│ │ │ └── UserController.java
│ │ ├── domain
│ │ │ └── User.java
│ │ ├── repository
│ │ │ └── UserRepository.java
│ │ └── service
│ │ └── UserService.java
│ │ ├── FuckingGreatWebfluxApplication.java
│ └── resources
│ └── application.properties
└── test
└── java
└── com
└── example
└── webflux
├── controller
│ └── UserControllerTest.java
└── FuckingGreatWebfluxApplicationTests.java
3.2、Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
通过Spring Initaializr新建工程的时候 选择如下:
3.3、Controller
controller层面提供了三个方法。getUser方法返回Mono,findUser返回的是Flux。Mono和Flux的区别在于,Mono返回的是0或者1个元素,Flux返回的是0或者多个元素。换句话说,如果想要从Spring MVC Web迁移到Reactive Web的话,凡是返回对象的则调整为Mono,凡是返回集合时则可以调整Flux即可(依实际情况)。
另外特别说明的是我在getUser和findUser方法里面都使用了.log
方法,她会打印出日志供我们观察。
@RestController
@RequestMapping("/v1")
public class UserController {
@Resource
UserService userService;
@GetMapping(path = "/users/{userId}", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(code = HttpStatus.OK)
public Mono<User> getUser(@PathVariable("userId") Long userId) {
return userService.find(userId).log();
}
@PostMapping(path = "/users", consumes = MediaType.APPLICATION_JSON_VALUE)
@ResponseStatus(code = HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody User user) {
return userService.create(user);
}
@GetMapping(path = "/users", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> findUser() {
return userService.findAll().delayElements(Duration.ofSeconds(1L)).log();
}
3.4、 Service
服务层代码,我在find方法里面特意延迟了5秒钟,我们后续观察一下日志运行情况
@Service
public class UserService {
@Resource
UserRepository userRepository;
public Mono<User> find(Long id) {
try {
Thread.sleep(5 * 1000L);
} catch (InterruptedException e) {
}
return userRepository.find(id);
}
public Mono<User> create(User user) {
return userRepository.create(user);
}
public Flux<User> findAll() {
return userRepository.findAll();
}
其他的代码这里不罗列了,详细看源码部分吧!
4、单元测试
4.1 、getUser测试
如下测试代码主要是获取ID为1的user,并且断言该user的firstname为Cattle
@Test
void getUser() {
webTestClient.get().uri("/v1/users/1").exchange().expectStatus().isOk().expectBody(User.class).value(
(Consumer<User>) user -> Assertions.assertEquals(user.getFirstName(), "Cattle"));
}
输出:
2020-05-11 00:07:24.849 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2020-05-11 00:07:24.855 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | request(unbounded)
2020-05-11 00:07:24.856 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | onNext(User(id=1, firstName=Cattle, lastName=Ma))
2020-05-11 00:07:24.868 INFO 19356 --- [ parallel-1] reactor.Mono.Just.1 : | onComplete()
从日志打印可以看出,虽然我在获取的时候延迟了5秒啊,但是request和onComplete的时间完全一样,大家想一想,说明了什么呢?
4.2、findUser测试
测试代码如下
@Test
void findUser() {
webTestClient.get().uri("/v1/users")
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class).consumeWith(u -> u.getResponseBody().forEach(System.out::println));
}
输出:
2020-05-11 00:15:44.597 INFO 20503 --- [ parallel-1] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2020-05-11 00:15:44.603 INFO 20503 --- [ parallel-1] reactor.Flux.ConcatMap.1 : request(1)
2020-05-11 00:15:45.611 INFO 20503 --- [ parallel-2] reactor.Flux.ConcatMap.1 : onNext(User(id=1, firstName=Cattle, lastName=Ma))
2020-05-11 00:15:45.722 INFO 20503 --- [ main] reactor.Flux.ConcatMap.1 : request(31)
2020-05-11 00:15:46.637 INFO 20503 --- [ parallel-3] reactor.Flux.ConcatMap.1 : onNext(User(id=2, firstName=Tony, lastName=Ma))
2020-05-11 00:15:47.639 INFO 20503 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onNext(User(id=3, firstName=Jack, lastName=Ma))
2020-05-11 00:15:47.641 INFO 20503 --- [ parallel-4] reactor.Flux.ConcatMap.1 : onComplete()
User(id=1, firstName=Cattle, lastName=Ma)
User(id=2, firstName=Tony, lastName=Ma)
User(id=3, firstName=Jack, lastName=Ma)
从日志可以分析出,虽然我们只请求了一次,但是它内部实际上是request多次才获取到全部的结果,然后关闭了应用。思考一下,我们能不能做到无线推流(Server Send Event),而不会有onComplete出现呢?将不断产生的数据全部流到客户端。
5、源码
https://github.com/cattles/fucking-great-webflux.git
6、总结
前面说了那么多,而且还留下了一些疑问,那么你是否有继续探索的欲望了呢?好好交往吧,如同初恋一样!相信未来我们都会越来越多的见到异步非阻塞响应式编程曼妙的身影。本次介绍的内容比较少,主要还是在应用层面,那么数据库层是否也需要异步支持呢?不错,必须的。后面我会另外在介绍事件驱动的文章中提到支持的数据库Reactive Relational Database,请大家持续关注哦!
7、附录
Functional Endpoints: https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-fn
Annotated Controllers: https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-controller