什么是响应式编程
响应式编程是一种新的编程范式。其使用函数式编程的方式构建异步处理管道。
举个例子🌰
Flux.just(context)
.publishOn(Schedulers.boundedElastic())
.filter(c -> mergeHandler.handle(c)) // 已合并的消息提前过滤
.delaySequence(Duration.ofMillis(messageConfigDataManager.getTimeWindowMilliSeconds())) // 可变时间窗口
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(cxt -> {
mergeHandler.removeCache(context);
next(context);
})
.subscribe();
响应式宣言
来自不同领域的组织正在不约而同地发现一些看起来如出一辙的软件构建模式。它们的系统更加稳健,更加有可回复性,更加灵活,并且以更好的定位来满足现代的需求。这些变化之所以会发生,是因为近几年的应用需求出现了戏剧性的变化。仅仅在几年之前,大型应用意味着数十台服务器,数秒的响应时间,数小时的离线维护时间以及若干 GB 的数据。而在今天,应用被部署在一切场合,从移动设备到基于云的集群,这些集群运行在数以千计的多核心处理器的之上。用户期望毫秒级的响应时间以及 100%的正常运行时间。数据则以 PB 为单位来衡量。
昨天的软件架构已经完全无法地满足今天的需求。我们相信,一种条理分明的系统架构方法是必要的,而且我们相信关于这种方法的所有必要方面已经逐一地被人们认识到:我们需要的系统是反应式的,具有可回复性的,可伸缩的,以及以消息驱动的。我们将这样的系统称之为反应式系统。以反应式系统方式构建的系统更加灵活,松耦合和可扩展。这使得它们更容易被开发,而且经得起变化的考验。它们对于系统失败表现出显著的包容性,并且当失败真的发生时,它们能用优雅的方式去应对,而不是放任灾难的发生。反应式系统是高度灵敏的,能够给用户以有效的交互式的反馈。
反应式系统具有如下特质
即时响应性(Responsive)
只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。
回弹性(Resilient)
系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。
弹性(Elastic)
系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。
消息驱动(Message Driven)
反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。
Reactive Streams API
Reactive Streams 是响应式编程做异步流处理的实施标准,类比 JDBC.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Reactive Library 分代
第 0 代 java.util.Observable 第 1 代 rx.NET, Reactive4Java,早期的 RxJava 第 2 代 RxJava 第 3 代 RxJava2, Project Reactor, Akka-Streams 第 4 代 RxJava2,Project Reactor 2.5+
Project Reactor
Implementing Reactive Streams API and Reactive Extensions Interacts with Java 8 functional API, Completable Future, Stream and Duration Fully non-blocking,backpressure-ready network engines for HTTP (including Websockets), TCP and UDP.
Flux
Flux用来创建0或N个元素
// Create a Flux that completes without emitting any item.
static <T> Flux<T> empty()
// Create a new Flux that emits the specified item(s) and then complete.
static <T> Flux<T> just(T... data)
// Create a Flux that emits the items contained in the provided Iterable.
static <T> Flux<T> fromIterable(Iterable<? extends T> it)
// Create a Flux that completes with the specified error.
static <T> Flux<T> error(Throwable error)
// Create a new Flux that emits an ever incrementing long starting with 0 every period on the global timer.
static Flux<Long> interval(Duration period)
static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
Mono
Mono用来创建0或1个元素
// Create a Mono that completes without emitting any item.
static <T> Mono<T> empty()
// Return a Mono that never emits any signal
static <T> Mono<T> never()
// Create a Mono from an available (unique) value
static <T> Mono<T> just(T data)
// Create a Mono that emits an Exception
static <T> Mono<T> error(Throwable error)
Reactor Example
private static List<String> words = Arrays.asList( "the", "quick", "brown", "fox","jumped", "over", "the", "lazy", "dog");
Flux.fromIterable(words)
.flatMap(word -> Flux.fromArray(word.split("")))
.concatWith(Mono.just("s"))
.distinct()
.sort()
.zipWith(Flux.range(1, Integer.MAX_VALUE), (string, count) ->
String.format("%2d. %s", count, string)
)
.subscribe(System.out::println);
Reactive vs Java 8 Stream
Stream is pull-based,Reactive is push-based Stream can only be used once, Reactive can be subscribed many times Stream#parallel() 使用 fork-join 并发,Reactive 使用 Event Ioop
Reactor Operators 300+
Creating a New Sequence…
Transforming an Existing Sequence
Peeking into a Sequence
Filtering a Sequence
Handling Errors
Working with Time
Splitting a Flux
Going Back to the Synchronous World
Marble Diagrams
响应式编程解决什么问题
程序通过操作系统线程,每个请求一个线程执行的模式,带来了很大的性能问题,这个问题被响应式编程的方式解决了。但与此同时,也带来了很多开发和维护上的问题,因为响应式编程的代码更难测试。
设计 NIO 编码,业界主流的编码方式主要有以上几种,通过 CompletableFuture 和 Lambda 表达式,可以快速实现轻量业务异步封装与编排,与 Callback 相比可以避免方法多层嵌套问题,但面对相对复杂业务逻辑时仍存在以下局限:
难以简单优雅实现多异步任务编排;
难以处理实时流式场景;
难以支持高级异常处理;
不支持任务延迟执行。
使用 Reactor 还有另外一个好处,那就是统一异步编程模型。比如有的异步编程框架提供 ListenableFuture,有的是 CompletableFuture,还有 gRPC、dubbo、webflux 等中间件框架,都提供了自己的异步编程模型实现。如果直接针对各个框架自己的原生实现进行异步编程,将会存在不同风格的代码。而 Reactor 是响应式库的当前标准,使用 Reactor 库可以封装不同异步编程框架的异构实现,使用统一的 API 执行异步编程
总结
随着云原生浪潮的到来以及物联网、边缘计算的推进,未来应用间网络通讯的量级将会变得越来越庞大,网络 IO 会是系统架构中的一等公民。如何使我们的应用能够具有更高的性能和更健壮的特性,以及如何降低硬件资源的成本,这些挑战将促使应用开发人员不断的学习实践类似 reactive 相关的技术。而在学习实践的过程中,对经典的 servlet 架构的优化重构一定是具有代表性意义的。
在适合的业务场景下,响应式技术架构能够有效提升服务吞吐能力,降低业务编排复杂度,帮助构建云原生时代整体系统快速即时反应能力。但同时构建 Reactive 模式的程序也为开发者带来更高的要求,面临比同步更为复杂的编程模型,需要更好的处理好阻塞和写出更优秀的异步代码。
参考资料
官网
响应式编程介绍
https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro
https://www.reactivemanifesto.org/
响应式编程解决什么问题
https://www.infoq.cn/article/xycwyk9*tfmpfno6rkwt
API文档
https://projectreactor.io/docs/core/release/api/
云栖社区响应式编程介绍
https://developer.aliyun.com/article/682897
Reactive 模式在 Trip.com 消息推送平台上的实践
https://www.infoq.cn/article/stgdyinb2le4fpzxix1y
Netty4 实战精华EventLoop 和线程模型