响应式编程

什么是响应式编程

响应式编程是一种新的编程范式。其使用函数式编程的方式构建异步处理管道。

举个例子🌰

Flux.just(context)
        .publishOn(Schedulers.boundedElastic())
        .filter(c -> mergeHandler.handle(c))    // 已合并的消息提前过滤
        .delaySequence(Duration.ofMillis(messageConfigDataManager.getTimeWindowMilliSeconds()))    // 可变时间窗口
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .doOnNext(cxt -> {
            mergeHandler.removeCache(context);
            next(context);
        })
        .subscribe();

响应式宣言

来自不同领域的组织正在不约而同地发现一些看起来如出一辙的软件构建模式。它们的系统更加稳健,更加有可回复性,更加灵活,并且以更好的定位来满足现代的需求。这些变化之所以会发生,是因为近几年的应用需求出现了戏剧性的变化。仅仅在几年之前,大型应用意味着数十台服务器,数秒的响应时间,数小时的离线维护时间以及若干 GB 的数据。而在今天,应用被部署在一切场合,从移动设备到基于云的集群,这些集群运行在数以千计的多核心处理器的之上。用户期望毫秒级的响应时间以及 100%的正常运行时间。数据则以 PB 为单位来衡量。

昨天的软件架构已经完全无法地满足今天的需求。我们相信,一种条理分明的系统架构方法是必要的,而且我们相信关于这种方法的所有必要方面已经逐一地被人们认识到:我们需要的系统是反应式的,具有可回复性的,可伸缩的,以及以消息驱动的。我们将这样的系统称之为反应式系统。以反应式系统方式构建的系统更加灵活,松耦合和可扩展。这使得它们更容易被开发,而且经得起变化的考验。它们对于系统失败表现出显著的包容性,并且当失败真的发生时,它们能用优雅的方式去应对,而不是放任灾难的发生。反应式系统是高度灵敏的,能够给用户以有效的交互式的反馈。

反应式系统具有如下特质

即时响应性(Responsive)

只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。

回弹性(Resilient)

系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。

弹性(Elastic)

系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。

消息驱动(Message Driven)

反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

image.png

Reactive Streams API

Reactive Streams 是响应式编程做异步流处理的实施标准,类比 JDBC.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s); 
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {  
    public void request(long n);  
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Reactive Library 分代

第 0 代 java.util.Observable 第 1 代 rx.NET, Reactive4Java,早期的 RxJava 第 2 代 RxJava 第 3 代 RxJava2, Project Reactor, Akka-Streams 第 4 代 RxJava2,Project Reactor 2.5+

Project Reactor

Implementing Reactive Streams API and Reactive Extensions Interacts with Java 8 functional API, Completable Future, Stream and Duration Fully non-blocking,backpressure-ready network engines for HTTP (including Websockets), TCP and UDP.

Flux

Flux用来创建0或N个元素

// Create a Flux that completes without emitting any item.
static <T> Flux<T> empty()

// Create a new Flux that emits the specified item(s) and then complete.
static <T> Flux<T> just(T... data)

// Create a Flux that emits the items contained in the provided Iterable.
static <T> Flux<T> fromIterable(Iterable<? extends T> it)

// Create a Flux that completes with the specified error.
static <T> Flux<T> error(Throwable error)

// Create a new Flux that emits an ever incrementing long starting with 0 every period on the global timer.
static Flux<Long> interval(Duration period)

static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

Mono

Mono用来创建0或1个元素

// Create a Mono that completes without emitting any item.
static <T> Mono<T> empty()

// Return a Mono that never emits any signal
static <T> Mono<T> never()

// Create a Mono from an available (unique) value
static <T> Mono<T> just(T data)

// Create a Mono that emits an Exception
static <T> Mono<T> error(Throwable error)

Reactor Example

private static List<String> words = Arrays.asList(  "the", "quick", "brown", "fox","jumped", "over", "the", "lazy", "dog");

Flux.fromIterable(words)
    .flatMap(word -> Flux.fromArray(word.split("")))
    .concatWith(Mono.just("s"))
    .distinct()
    .sort()
    .zipWith(Flux.range(1, Integer.MAX_VALUE),  (string, count) ->
        String.format("%2d. %s", count, string)
    )
    .subscribe(System.out::println);

Reactive vs Java 8 Stream

Stream is pull-based,Reactive is push-based Stream can only be used once, Reactive can be subscribed many times Stream#parallel() 使用 fork-join 并发,Reactive 使用 Event Ioop

Reactor Operators 300+

  • Creating a New Sequence…

  • Transforming an Existing Sequence

  • Peeking into a Sequence

  • Filtering a Sequence

  • Handling Errors

  • Working with Time

  • Splitting a Flux

  • Going Back to the Synchronous World

Marble Diagrams

image.png

响应式编程解决什么问题

程序通过操作系统线程,每个请求一个线程执行的模式,带来了很大的性能问题,这个问题被响应式编程的方式解决了。但与此同时,也带来了很多开发和维护上的问题,因为响应式编程的代码更难测试。

设计 NIO 编码,业界主流的编码方式主要有以上几种,通过 CompletableFuture 和 Lambda 表达式,可以快速实现轻量业务异步封装与编排,与 Callback 相比可以避免方法多层嵌套问题,但面对相对复杂业务逻辑时仍存在以下局限:

  • 难以简单优雅实现多异步任务编排;

  • 难以处理实时流式场景;

  • 难以支持高级异常处理;

  • 不支持任务延迟执行。

使用 Reactor 还有另外一个好处,那就是统一异步编程模型。比如有的异步编程框架提供 ListenableFuture,有的是 CompletableFuture,还有 gRPC、dubbo、webflux 等中间件框架,都提供了自己的异步编程模型实现。如果直接针对各个框架自己的原生实现进行异步编程,将会存在不同风格的代码。而 Reactor 是响应式库的当前标准,使用 Reactor 库可以封装不同异步编程框架的异构实现,使用统一的 API 执行异步编程

总结

随着云原生浪潮的到来以及物联网、边缘计算的推进,未来应用间网络通讯的量级将会变得越来越庞大,网络 IO 会是系统架构中的一等公民。如何使我们的应用能够具有更高的性能和更健壮的特性,以及如何降低硬件资源的成本,这些挑战将促使应用开发人员不断的学习实践类似 reactive 相关的技术。而在学习实践的过程中,对经典的 servlet 架构的优化重构一定是具有代表性意义的。

在适合的业务场景下,响应式技术架构能够有效提升服务吞吐能力,降低业务编排复杂度,帮助构建云原生时代整体系统快速即时反应能力。但同时构建 Reactive 模式的程序也为开发者带来更高的要求,面临比同步更为复杂的编程模型,需要更好的处理好阻塞和写出更优秀的异步代码。

参考资料

官网

https://projectreactor.io/

响应式编程介绍

https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro

响应式宣言

https://www.reactivemanifesto.org/

响应式编程解决什么问题

https://www.infoq.cn/article/xycwyk9*tfmpfno6rkwt

API文档

https://projectreactor.io/docs/core/release/api/

云栖社区响应式编程介绍

https://developer.aliyun.com/article/682897

Reactive 模式在 Trip.com 消息推送平台上的实践

https://www.infoq.cn/article/stgdyinb2le4fpzxix1y

Netty4 实战精华EventLoop 和线程模型

https://developer.aliyun.com/article/635908

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容