Java ProjectReactor框架之Flux篇

Spring5现处在第四个预发布版,正式版将要发布了,它带来的一大特性就是响应式框架Spring WebFlux。默认使用ProjectReactor框架。因此。本文通过ProjectReactor中的Flux,来学习使用该框架,以及了解其传递的思想。

本文基于Reactor3.1 rc1

Reactor官方地址http://projectreactor.io/,官方文档写的十分详细,如果您有不错的英文能力,建议直接阅读官方文档。

Spring WebFlux 实践

首先,为大家带来一个使用了ProjectReactor的例子,该例子使用Spring Boot 2.0.0.BUILD-SNAPSHOT。因Spring Boot推荐默认配置(约定)优先,可以极大减少大量的重复的模版化代码,简化搭建过程。

Spring Boot 2.0.0稳定版还未出,不过也快了,目前处在第四个里程碑版本。

step1:搭建环境

spring boot部分工具如idea提供了可视化操作,选择reactive-web模块即可(你也可以多选一些你需要的模块),如果没有可视化的工具,也可访问官网的开始页面https://start.spring.io/,或者在pom中引入一下模块(web开发主流仍是maven,所以未采用gradle)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

step2:编写处理类

编写一个简单的处理类,TestHandler

@Service
@NonNullApi
public class TestHandler {

    public Mono<ServerResponse> data(ServerRequest request){
        Mono<String> mono =  Mono.justOrEmpty(request.queryParam("data"))
                .defaultIfEmpty("this is null")
                .map(it -> it.concat("! from server webflux!"));
        return ServerResponse.ok().body(mono,String.class);
    }

}

step3:编写路由

spring webflux也提供了函数试的路由配置,如下

@Configuration
public class RoutingConfiguration {

    //...

    @Bean
    public RouterFunction<ServerResponse> testRouterFunction(TestHandler handle) {
        return RouterFunctions.route(GET("/test").and(accept(APPLICATION_JSON)), handle::data);
    }

}

step4:测试,验证

当浏览器输入http://localhost:8080/test,得到结果:this is null! from server webflux!
当浏览器输入http://localhost:8688/test?data=hi,得到结果:hi! from server webflux!

我的webflux项目地址:GitHub

深入学习

看过实践后,你会发现有大量的使用Flux和Mono,它们是什么呢?

Flux<T> 继承自 Publisher<T> ,用于代表拥有 0 到 n 元素的流,相对于 Mono<T> (其包含0-1个元素) 更加复杂。所以弄懂了Flux,其实也已经对Mono熟悉了。

静态方法

Flux一般通过静态方法构造,所以先看看它的静态方法。

combineLatest

public static <T,V> Flux<V> combineLatest(Function<Object[],V> combinator, Publisher<? extends T>... sources)
构建一个Flux,混合由多个的发布者发布最新事件.

Type Parameters:
T - 表示发布者的事件类型
V - 被混合者混合后的类型
Parameters:
sources - 发布者,提供事件
combinator - 混合者,接受最新的事件,处理并传递给下游。
Returns: 一个以Flux为基础的混合流
不同的参数方法很多,这里都只展示一个。

concat

public static <T> Flux<T> concat(Publisher<? extends T>... sources)
用于连接一个流。与combineLatest不同的是,concat都是在前一个流完成后在连接新的流。而combineLatest,则哪个事件最先到的,哪个先处理。

Type Parameters:
T - 事件的类型
Parameters:
sources - 一系列的发布者
Returns: 一个新的Flux连接了所有的发布者,并传递给下游

concatDelayError

拥有与concat类似的方法,不同的是,遇到错误不提前拦截,而是等到最后发布的事件处理完成后

create,push

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
通过FluxSink API,以同步或者异步方式创建Flux。
例如:

 Flux.<String>create(emitter -> {

     ActionListener al = e -> {
         emitter.next(textField.getText());
     };
     // without cleanup support:

     button.addActionListener(al);

     // with cleanup support:

     button.addActionListener(al);
     emitter.onDispose(() -> {
         button.removeListener(al);
     });
 });

这是非常有用的,如果一个流,需要动态添加或者移除其他的多个事件,通过异步的api。而且,你将不必担心被取消和背压。
create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure) 设置背压方式
push方法用处与使用方式与create几乎一致,它们唯一的区别在于CreateMode类型 create为PUSH_PULL,而push为PUSH_ONLY,从文档中也可以一个为多线程一个为单线程

backpressure(背压)概念的理解

这里,我摘自一位大神的话,背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。简而言之,背压是流速控制的一种策略。

更多的背压到http://www.jianshu.com/p/2c4799fa91a4这里不多做介绍了

defer

public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)
这个方法提供了一种惰性策略,发布者不会一开始发布消息,直到订阅者创建实例.


Type Parameters:
T - 发布者发布或订阅者接受的类型
Parameters:
supplier - 一个发布者的供应者,当订阅的时候回调
Returns: 一个惰性的Flux

empty

public static <T> Flux<T> empty()
创建一个不含任何事件的流.

error

public static <T> Flux<T> error(Throwable error)
返回一个带着立即终止标识和错误信息的流

first

public static <I> Flux<I> first(Publisher<? extends I>... sources)
挑选出第一个发布者,由其提供事件。能有效避免多个源的冲突。

from

public static <T> Flux<T> from(Publisher<? extends T> source)
public static <T> Flux<T> fromIterable(Iterable<? extends T> it)
public static <T> Flux<T> fromStream(Stream<? extends T> s)
从一个发布者创建一个flux流

fromArray,fromIterable,fromStream

public static <T> Flux<T> fromArray(T[] array)
通过一个数组,或者一个可迭代的元素,或者一个流,创建flux流.

Type Parameters:
T - 数组的类型和Flux的类型
Parameters:
emmm.. - 数组,可迭代的元素,流
Returns: 新的flux流

generate

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
Programmatically create a Flux by generating signals one-by-one via a consumer callback.

Type Parameters:
T - the value type emitted
Parameters:
generator - Consume the SynchronousSink provided per-subscriber by Reactor to generate a single signal on each pass.
Returns: a Flux
没看懂,好像是说,通过编程方式创建一个一对一的消费回调

interval

public static Flux<Long> interval(Duration period)
间隔一定的时间,发送事件。
Runs on the Schedulers.parallel() Scheduler.

just

public static <T> Flux<T> just(T... data)
创建一个包含一系列元素的flux流

merge

public static <I> Flux<I> merge(Publisher<? extends I>... sources)
混合多个流,和combineLatest类似,但它要求是同类型的流合并,combineLatest需要提供合并方式

never

public static <T> Flux<T> never()
Create a Flux that will never signal any data, error or completion signal.

Type Parameters:
T - the Subscriber type target
Returns:
a never completing Flux
看一看,不是很明白,该流的用处。

range

public static Flux<Integer> range(int start, int count)
提供从start,到start + count的所有整数的flux流

switchOnNext

public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers)
从最新的发布者那里获取事件,如果有新的发布者加入,则改用新的发布者。
当最后一个发布者完成所有发布事件,并且没有发布者加入,则flux完成。

using

public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
Eager resource cleanup happens just before the source termination and exceptions raised by the cleanup Consumer may override the terminal even.

zip

public static <I,O> Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
通过混合者,合并多个流成一个输出流,一一对应合并


...

看一下下面的api
public static <T1,T2,T3,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Object[],V> combinator)
public static <T1,T2,T3,T4,V> Flux<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Object[],V> combinator)
...
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, lisher<? extends T2> source2)
public static <T1,T2,T3> Flux<Tuple3<T1,T2,T3>> zip(Publisher<? extends T1> source1, lisher<? extends T2> source2, lisher<? extends T3> source3)
...
ヽ(o_ _)o摔倒,我也是服了project reactor 官方。

常用的实例方法

静态的方法介绍完了,但是实例方法比静态方法多太多,所以这里只举常用的几种介绍

all,any,hasElement,hasElements

这几个方法调用,均返回包涵一个Boolean信号的Mono。

  • all(Predicate<? super T> predicate)表示所有值均满足条件
  • any(Predicate<? super T> predicate)表示存在一个值满足条件
  • hasElement(T t)表示是否存在该值
  • hasElements()表示是否拥有一个或多个元素

as,compose

public final <P> P as(Function<? super Flux<T>,P> transformer)
转化flux为一个目标类型。
官方例子:flux.as(Mono::from).subscribe()
将flux通过Mono.from函数转化为mono
public final <V> Flux<V> compose(Function<? super Flux<T>,? extends Publisher<V>> transformer)
compose与as的区别是转化类型做了限制,必须继承Publisher,同时compose是惰性的。在很多时候,写法上没有差别如flux.compose(Mono::from).subscribe()

blockFirst,blockLast

阻塞至第一个或者最后一个值处理完成

butter系列

该系列实例方法很多,作用是将一系列元素,分成一组或者多组,该方法可用在按组批量操作上,例如,以时间间隔分组,批量添加数据。

cache

如其名缓存,相当于复制一份用于接下来的操作,而当前的流将会被缓存起来,用于之后的操作。

cancelOn

public final Flux<T> cancelOn(Scheduler scheduler)
取消

cast

public final <E> Flux<E> cast(Class<E> clazz)
强转

checkpoint

用于检测当前节点,流中是否存在错误

collect系列

该系列实例方法,用于收集所有的元素到特定类型,如list、map等
处理完成时返回Mono

concatMap系列,flatMap系列

举例说明吧,[[1,2],[4,5],[6,7,8]] -> [1,2,4,5,6,7,8]起这种转化作用


flatMap系列一样

concatWith

与concatMap不同,这是相加

defaultIfEmpty

public final Flux<T> defaultIfEmpty(T defaultV) 默认值

distinct

public final Flux<T> distinct()
去重,相对与jdk8,多了下面两种方法
public final <V> Flux<T> distinct(Function<? super T,? extends V> keySelector)
public final <V,C extends Collection<? super V>> Flux<T> distinct(Function<? super T,? extends V> Supplier<C> distinctCollectionSupplier)
去除与V匹配的和第二个不怎么理解,,,这让我想到了filter

do系列

还系列有doOnNext,doOnError,doOnCancel等等,均表示完成后触发

elementAt

返回某一位置的值,类型为Mono<T>,可以设置默认值

filter

public final Flux<T> filter(Predicate<? super T> p)
过滤出满足条件的

groupBy

public final <K> Flux<GroupedFlux<K,T>> groupBy(Function<? super T,? extends K> keyMapper)


分组,根据提供的keyMapper

My Blog: https://www.dnocm.com/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,642评论 18 139
  • 一、基本数据类型 注释 单行注释:// 区域注释:/* */ 文档注释:/** */ 数值 对于byte类型而言...
    龙猫小爷阅读 4,257评论 0 16
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,604评论 18 399
  • 通用文件夹使用要求 Editor文件夹 由于unity是分别编译的,会生成不同的dll文件,比如: Assembl...
    霸俊流年阅读 1,205评论 0 2
  • 中国是少数几个女性自杀率高于男性自杀率的国家之一
    柳卿卯木阅读 125评论 0 0