4、webflux

Spring WebFlux 概述

图一:

webflux1.png

图二:


webflux2.png

从图可以看出对支持Spring 5的Spring Boot 2.0来说,新加入的响应式技术栈是其主打核心特性。

左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,从上到下依次是Router Functions,WebFlux,Reactive Streams三个新组件。
Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。

在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux格式,以便进行统一处理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。

示例:

1、增加依赖

  <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId> 
    </dependency>

2、DAO层:

import com.getset.webfluxdemo.model.User;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Mono;

public interface UserRepository extends ReactiveCrudRepository<User, String> {
    Mono<User> findByUsername(String username);
    Mono<Long> deleteByUsername(String username);
}

3、service层:

@Service
public class UserService {
    @Autowired
    private UserRepository userRepository;

    public Flux<User> findAll() {
        return userRepository.findAll().log();
    }
    public Mono<Long> deleteByUsername(String username) {
        return userRepository.deleteByUsername(username);
    }
    public Mono<User> findByUsername(String username) {
        return userRepository.findByUsername(username);
    }
}

4、Spring MVC注解声明Reactive Controller:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping(“/user”)
public class UserController {
    @Autowired
    private UserService userService;

    @DeleteMapping(“/{username}”)
    public Mono<Long> deleteByUsername(@PathVariable String username) {
        return this.userService.deleteByUsername(username);
    }
    @GetMapping(“/{username}”)
    public Mono<User> findByUsername(@PathVariable String username) {
        return this.userService.findByUsername(username);
    }

    @GetMapping(value = “”, produces = MediaType./APPLICATION_STREAM_JSON_VALUE/)
    public Flux<User> findAll() {
        return this.userService.findAll().delayElements(Duration./ofSeconds/(1));
    }
}

5、Router Functions接口实现

import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class AllRouters {

    @Bean
    RouterFunction<ServerResponse> userRouter(UserHandler handler) {
        return nest(
                // 相当于类上面的 @RequestMapping("/user")
                path("/user"),
                // 下面的相当于类里面的 @RequestMapping
                // 得到所有用户
                route(GET("/"), handler::getAllUser)
                        // 创建用户
                        .andRoute(POST("/").and(accept(MediaType.APPLICATION_JSON_UTF8)),
                                handler::createUser)
                        // 删除用户
                        .andRoute(DELETE("/{id}"), handler::deleteUserById));
    }
}
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Component
public class UserHandler {
    private final UserRepository repository;
    public UserHandler(UserRepository rep) {
        this.repository = rep;
    }
    /**
     * 得到所有用户
     */
    public Mono<ServerResponse> getAllUser(ServerRequest request) {
        return ok().contentType(APPLICATION_JSON_UTF8)
                .body(this.repository.findAll(), User.class);
    }

    /**
     * 创建用户
     */
    public Mono<ServerResponse> createUser(ServerRequest request) {
        // 2.0.0 是可以工作, 但是2.0.1 下面这个模式是会报异常
        Mono<User> user = request.bodyToMono(User.class);

        return user.flatMap(u -> {
            // 校验代码需要放在这里
            CheckUtil.checkName(u.getName());

            return ok().contentType(APPLICATION_JSON_UTF8)
                    .body(this.repository.save(u), User.class);
        });
    }

    /**
     * 根据id删除用户
     */
    public Mono<ServerResponse> deleteUserById(ServerRequest request) {
        String id = request.pathVariable("id");

        return this.repository.findById(id)
                .flatMap(
                        user -> this.repository.delete(user).then(ok().build()))
                .switchIfEmpty(notFound().build());
    }

}

6、实现持续推送消息:mongDB-tail

@RestController
@RequestMapping("/events")
public class MyEventController {
    @Autowired
    private MyEventRepository myEventRepository;

    @GetMapping(path = "", produces = MediaType./APPLICATION_STREAM_JSON_VALUE/)
    public Flux<MyEvent> getEvents() {
        return this.myEventRepository.findBy();
    }
}

import com.getset.webfluxdemo.model.MyEvent;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.mongodb.repository.Tailable;
import reactor.core.publisher.Flux;

public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> {
    @Tailable
    Flux<MyEvent> findBy();
}

@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "event")
public class MyEvent {
    @Id
    private Long id;
    private String message;
}

(5)Spring WebFlux快速上手——响应式Spring的道法术器 - 刘康的专栏 - CSDN博客

响应式Spring Data

开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。
目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。
Spring Boot 2.0 有两条不同的线:
Spring Web MVC -> Spring Data
Spring WebFlux -> Spring Data Reactive

1、对于Spring Data Reactive原,来的 Spring 针对 Spring Data (JDBC等)的事务管理肯定不起作用了。因为原来的 Spring 事务管理(Spring Data JPA)都是基于 ThreadLocal 传递事务的,其本质是基于 阻塞 IO 模型,不是异步的。但 Reactive 是要求异步的,不同线程里面 ThreadLocal 肯定取不到值了。如果想在Reactive 编程中做到事务,通过在参数上面传递 conn,复杂度较高。
2、对于Spring Data,想使用反应式编程,可以通过协程或线程异步集成。使用时注意:spring申明式事务管理时,线程边界保证事务在同一个线程中。

          public Flux<Order> findAll() {
              return Flux.fromCallable(
                              () ->  orderRepository.findAll()
                         ).subscribeOn(Schedulers.elastic());
       }
       orderRepository.findAll方法返回值类型List<Order>

为啥只能运行在 Servlet 3.1+ 容器

大家知道,3.1 规范其中一个新特性是异步处理支持。
异步处理支持:Servlet 线程不需一直阻塞,即不需要到业务处理完毕再输出响应,然后结束 Servlet线程。异步处理的作用是在接收到请求之后,Servlet 线程可以将耗时的操作委派给另一个线程来完成,在不生成响应的情况下返回至容器。主要应用场景是针对业务处理较耗时的情况,可以减少服务器资源的占用,并且提高并发处理速度。
WebFlux的实现需要容器的异步支持,所以 WebFlux 支持的容器可以是 Tomcat、Jetty(Non-Blocking IO API) ,也可以是 Netty 和 Undertow,其本身就支持异步容器。在容器中 Spring WebFlux 会将输入流适配成 Mono 或者 Flux 格式进行统一处理。


参考:

官网:
Web on Reactive Stack
23. WebFlux framework

响应式Spring的道法术器:
响应式Spring的道法术器(Spring WebFlux 教程) - CSDN博客
响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)-刘康的博客-51CTO博客

使用 Spring 5 的 WebFlux 开发反应式 Web 应用
聊聊 Spring Boot 2.0 的 WebFlux | 泥瓦匠-右侧关注我的公众号吧

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容