Spring WebFlux 概述
图一:
图二:
从图可以看出对支持Spring 5的Spring Boot 2.0来说,新加入的响应式技术栈是其主打核心特性。
左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router Functions,WebFlux,Reactive Streams三个新组件。
Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux格式,以便进行统一处理。
值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。
示例:
1、增加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2、DAO层:
import com.getset.webfluxdemo.model.User;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Mono<User> findByUsername(String username);
Mono<Long> deleteByUsername(String username);
}
3、service层:
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
public Flux<User> findAll() {
return userRepository.findAll().log();
}
public Mono<Long> deleteByUsername(String username) {
return userRepository.deleteByUsername(username);
}
public Mono<User> findByUsername(String username) {
return userRepository.findByUsername(username);
}
}
4、Spring MVC注解声明Reactive Controller:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping(“/user”)
public class UserController {
@Autowired
private UserService userService;
@DeleteMapping(“/{username}”)
public Mono<Long> deleteByUsername(@PathVariable String username) {
return this.userService.deleteByUsername(username);
}
@GetMapping(“/{username}”)
public Mono<User> findByUsername(@PathVariable String username) {
return this.userService.findByUsername(username);
}
@GetMapping(value = “”, produces = MediaType./APPLICATION_STREAM_JSON_VALUE/)
public Flux<User> findAll() {
return this.userService.findAll().delayElements(Duration./ofSeconds/(1));
}
}
5、Router Functions接口实现
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class AllRouters {
@Bean
RouterFunction<ServerResponse> userRouter(UserHandler handler) {
return nest(
// 相当于类上面的 @RequestMapping("/user")
path("/user"),
// 下面的相当于类里面的 @RequestMapping
// 得到所有用户
route(GET("/"), handler::getAllUser)
// 创建用户
.andRoute(POST("/").and(accept(MediaType.APPLICATION_JSON_UTF8)),
handler::createUser)
// 删除用户
.andRoute(DELETE("/{id}"), handler::deleteUserById));
}
}
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Component
public class UserHandler {
private final UserRepository repository;
public UserHandler(UserRepository rep) {
this.repository = rep;
}
/**
* 得到所有用户
*/
public Mono<ServerResponse> getAllUser(ServerRequest request) {
return ok().contentType(APPLICATION_JSON_UTF8)
.body(this.repository.findAll(), User.class);
}
/**
* 创建用户
*/
public Mono<ServerResponse> createUser(ServerRequest request) {
// 2.0.0 是可以工作, 但是2.0.1 下面这个模式是会报异常
Mono<User> user = request.bodyToMono(User.class);
return user.flatMap(u -> {
// 校验代码需要放在这里
CheckUtil.checkName(u.getName());
return ok().contentType(APPLICATION_JSON_UTF8)
.body(this.repository.save(u), User.class);
});
}
/**
* 根据id删除用户
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request) {
String id = request.pathVariable("id");
return this.repository.findById(id)
.flatMap(
user -> this.repository.delete(user).then(ok().build()))
.switchIfEmpty(notFound().build());
}
}
6、实现持续推送消息:mongDB-tail
@RestController
@RequestMapping("/events")
public class MyEventController {
@Autowired
private MyEventRepository myEventRepository;
@GetMapping(path = "", produces = MediaType./APPLICATION_STREAM_JSON_VALUE/)
public Flux<MyEvent> getEvents() {
return this.myEventRepository.findBy();
}
}
import com.getset.webfluxdemo.model.MyEvent;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.mongodb.repository.Tailable;
import reactor.core.publisher.Flux;
public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> {
@Tailable
Flux<MyEvent> findBy();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "event")
public class MyEvent {
@Id
private Long id;
private String message;
}
(5)Spring WebFlux快速上手——响应式Spring的道法术器 - 刘康的专栏 - CSDN博客
响应式Spring Data
开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。
目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。
Spring Boot 2.0 有两条不同的线:
Spring Web MVC -> Spring Data
Spring WebFlux -> Spring Data Reactive
1、对于Spring Data Reactive原,来的 Spring 针对 Spring Data (JDBC等)的事务管理肯定不起作用了。因为原来的 Spring 事务管理(Spring Data JPA)都是基于 ThreadLocal 传递事务的,其本质是基于 阻塞 IO 模型,不是异步的。但 Reactive 是要求异步的,不同线程里面 ThreadLocal 肯定取不到值了。如果想在Reactive 编程中做到事务,通过在参数上面传递 conn,复杂度较高。
2、对于Spring Data,想使用反应式编程,可以通过协程或线程异步集成。使用时注意:spring申明式事务管理时,线程边界保证事务在同一个线程中。
public Flux<Order> findAll() {
return Flux.fromCallable(
() -> orderRepository.findAll()
).subscribeOn(Schedulers.elastic());
}
orderRepository.findAll方法返回值类型List<Order>
为啥只能运行在 Servlet 3.1+ 容器
大家知道,3.1 规范其中一个新特性是异步处理支持。
异步处理支持:Servlet 线程不需一直阻塞,即不需要到业务处理完毕再输出响应,然后结束 Servlet线程。异步处理的作用是在接收到请求之后,Servlet 线程可以将耗时的操作委派给另一个线程来完成,在不生成响应的情况下返回至容器。主要应用场景是针对业务处理较耗时的情况,可以减少服务器资源的占用,并且提高并发处理速度。
WebFlux的实现需要容器的异步支持,所以 WebFlux 支持的容器可以是 Tomcat、Jetty(Non-Blocking IO API) ,也可以是 Netty 和 Undertow,其本身就支持异步容器。在容器中 Spring WebFlux 会将输入流适配成 Mono 或者 Flux 格式进行统一处理。
参考:
官网:
Web on Reactive Stack
23. WebFlux framework
响应式Spring的道法术器:
响应式Spring的道法术器(Spring WebFlux 教程) - CSDN博客
响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)-刘康的博客-51CTO博客
使用 Spring 5 的 WebFlux 开发反应式 Web 应用
聊聊 Spring Boot 2.0 的 WebFlux | 泥瓦匠-右侧关注我的公众号吧