Spring Cloud Stream还支持使用反应性API,其中将传入和传出数据作为连续数据流处理。通过spring-cloud-stream-reactive提供对反应性API的支持,需要将其明确添加到您的项目中。
具有反应性API的编程模型是声明式的,而不是指定如何处理每个单独的消息,您可以使用描述从入站到出站数据流的功能转换的运算符。
Spring Cloud Stream支持以下反应性API:
反应堆
RxJava 1.x
将来,它旨在支持基于活动流的更通用的模型。
反应式编程模型还使用@StreamListener注释来设置反应处理程序。差异在于:
@StreamListener注释不能指定输入或输出,因为它们作为参数提供,并从方法返回值;
必须使用@Input和@Output注释方法的参数,指示输入和分别输出的数据流连接到哪个输入或输出;
方法的返回值(如果有的话)将用@Output注释,表示要发送数据的输入。
注意反应式编程支持需要Java 1.8。
注意截至Spring Cloud Stream 1.1.1及更高版本(从布鲁克林发行版开始列出),反应式编程支持需要使用Reactor 3.0.4.RELEASE和更高版本。不支持早期的Reactor版本(包括3.0.1.RELEASE,3.0.2.RELEASE和3.0.3.RELEASE)。spring-cloud-stream-reactive将会过渡地检索正确的版本,但项目结构有可能将io.projectreactor:reactor-core的版本管理到较早版本,特别是在使用Maven时。对于通过Spring Initializr(Spring Boot 1.x)生成的项目,这将覆盖Reactor版本为2.0.8.RELEASE。在这种情况下,您必须确保释放正确版本的工件。这可以通过在io.projectreactor:reactor-core上直接依赖于3.0.4.RELEASE或更高版本的项目来简单地实现。
注意术语reactive的使用目前指的是正在使用的反应性API,而不是执行模型是无效的(即,绑定的端点仍然使用“推”而不是“拉”模型)。虽然通过使用Reactor提供了一些背压支持,但我们希望长期以来通过使用连接的中间件的本机反应客户端来支持完全无反应的管道。
基于反应器的处理程序
基于反应器的处理程序可以具有以下参数类型:
对于用@Input注释的参数,它支持反应器类型Flux。入站通量的参数化遵循与单个消息处理相同的规则:它可以是整个Message,一个可以是Message有效负载的POJO,也可以是一个POJO基于Message内容类型头的转换。提供多个输入;
对于使用Output注释的参数,它支持将方法生成的Flux与输出连接的类型FluxSender。一般来说,仅当方法可以有多个输出时才建议指定输出作为参数;
基于反应器的处理程序支持Flux的返回类型,其中必须使用@Output注释。当单个输出通量可用时,我们建议使用该方法的返回值。
这是一个简单的基于反应器的处理器的例子。
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
@Output(Processor.OUTPUT)
public Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
return input.map(s -> s.toUpperCase());
}
}
使用输出参数的同一个处理器如下所示:
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
public void receive(@Input(Processor.INPUT) Flux<String> input,
@Output(Processor.OUTPUT) FluxSender output) {
output.send(input.map(s -> s.toUpperCase()));
}
}
RxJava 1.x支持
RxJava 1.x处理程序遵循与基于反应器的规则相同的规则,但将使用Observable和ObservableSender参数和返回类型。
所以上面的第一个例子会变成:
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
@Output(Processor.OUTPUT)
public Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
return input.map(s -> s.toUpperCase());
}
}
上面的第二个例子将会变成:
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
public void receive(@Input(Processor.INPUT) Observable<String> input,
@Output(Processor.OUTPUT) ObservableSender output) {
output.send(input.map(s -> s.toUpperCase()));
}
}