Reactor 3-响应式编程-WHAT

转载请注明出处
作者:@宫三千

是什么?

响应式编程是一种编程的思维模式,区别于声明式编程,响应式编程更注重数据流转,每一段程序都是围围绕着数据来设计的,原始数据,在哪,经过那些处理之后,最终变成什么样子。

像这样围绕着数据流转的程序,我们可以简称为执行序列。应用响应式编程最直接的方式就是从以往的声明式程序中找出核心数据,或者说是抽象出核心数据,然后把程序的执行过程转变为对核心数据的执行序列,最终,如果是查询数据的程序,就返回当前的执行序列如果是执行命令的程序,则可以返回当前序列的后续(then())。

为什么不直接返回生成好的数据,而要返回但前的执行序列呢?原因有如下几点(可能还有其他的,但是暂时没想到)

  • 首先,你不知道该程序要在哪执行,或者说在哪个线程来执行
  • 再者,这是响应式编程的特性,当且仅当调用订阅方法( Flux#subscribe() )的时候,序列才会真正的执行
  • 最后,从设计的角度讲,响应式编程的数据返回值,实际上返回的是一个数据的发布者( Publisher),调用方相当于是数据的订阅者( Subscriber )(这里还存在冷热序列,和推拉模式的区别,后续再说),只有当订阅者去订阅数据的时候,发布者提供的数据以及相应的执行序列才会真正被执行,有另外一种叫法叫做 Lazy Compute ,延迟计算。
  • 最后的最后,统一的设计模式,有助于思维贯通,以响应式编程的方式去思考你的所有实现。

给你的API分门别类

有个微服务的最佳实践模式命令查询职责隔离模式(CQRS),在该模式当中,将客户端的请求分为两类:

  • 命令(Command)
  • 查询(Query)

在响应式编程设计中,我们也可以使用相同的概念。

该模式还有一个概念,叫做事件溯源(Event Sourcing),服务和服务之间除了命令和查询之外,还有事件,和响应式编程的模式非常契合,我也是因为之前学习过这个模式,才被启发想到需要给API分门别类的。

响应式编程的事件类型是固定的,通过调用接口去传递事件(next事件,error事件,subscribe事件,complete事件,事件也可以称之为状态,其实都是一回事儿,只不过抽象方法以及场景不一样,所以有不同的叫法)

命令

简单一点说,所有返回类型为 void 的接口都是命令式接口

一般这样的接口,使用响应式变编程的思维,需要返回一个内部执行序列的后续,所以说,所有void方法的返回值可以统统是Mono<Void> ,该对象代表了该接口提供的针对于传入参数的后续执行序列。

//声明式编程
public void runSomething(){doSomthing();}

//响应式编程
public Mono<Void> runSomthing(){return Mono.fromRunnable(()-> doSomthing());}

查询

简单一点说,所有返回类型不是 void 的接口都是查询接口

一般这样的接口,需要区分一下返回的是单个数据,还是一组数据,如果是单个数据,可以使用Mono<T>来表示,如果是一组数据,可以使用Flux<T> 来表示,这两个对象代表了该接口提供的针对传入参数的数据查询操作的执行序列。

其实查询操作使用数据发布者的角色来考虑更好理解:该接口返回了针对于传入参数的数据发布者。调用者可以通过订阅该发布者去获取数据。

//声明式编程
public <T> T getOne(){return t}

//响应式编程
public <T> Mono<T> getOne(){return Mono.fromSupplier(()-> t);}

拼接

简单一点说,所有不直接传递参数值,而是提供一个参数的容器,或者把参数封装成输入,并且返回响应的容器或者输出的接口,都是拼接接口(和map操作有点像)。

使用Java8的StreamAPI也可以使用这样的方式去设计API,避免重复声明逻辑一样的流定义。

public <R,T> Mono<R> mapSomthing(Mono<T> monoT){return monoT.map(()->v);}

订阅者还是发布者?

响应式编程更注重数据的流转,那么,谁才是数据的发布者,谁又是数据的订阅者呢?当然这个问题会随着业务逻辑的不同而有所变化。发布者也可以是其他发布者的订阅者,订阅者也有可能是其他订阅者的发布者,这样就会形成一个链条,将不同的组件链接起来,实现某些业务。

通常的,我认为,每个独立的组件或者模块,不论大小,在响应式编程的设计思路下,都需要抽象出内部的数据发布者,和数据订阅者,其实就是输入和输出(Input and Output),嗯,这是应该是很正常的事情。发布者和订阅者也是输入和输出的其中一种抽象方式。

这里想再多说一点关于输入和输出的话题,个人感觉,任何程序,不论粒度的大小(代码块,函数,类,模块,组件,服务,中间件,框架等等)在进行设计的时候都需要考虑这段程序的的输入是什么,输出是什么,以及输入和输出的方式,函数自然不用说,就是传参和返回值,想要改变这种简单的输入输出模式,只能通过使用某些模式来抽象参数和返回值的方式来扩展输入输出的功能。这里说到输入,响应式编程,还有另外一种思考方式,上下游 ,传递参数的时候,直接给一个明确的值,当然也可以传递某些执行序列的上游信息,然后方法内部实现是提供针对某些执行序列的下游执行序列,然后返回一个新的执行序列。Reactor3中的执行序列,当你在添加下游执行单元的时候,会返回一个新的执行序列对象,其中包裹着上游执行序列对象,内部称之为source,有点像是装饰者模式,没有太深入看源码,所以不太确定。

所以除了命令查询两种方法之外,还有另一种,应该叫做 拼接 方法(我自己起的名字)。拼接式接口就像上边所说的,传一个上游序列,拼接新的执行单元,然后返回新的执行序列,方便复用。

在使用Spring WebFlux 实现websocket的时候有个WebSocketHandler就和这个拼接方法类似,不过那个是热序列的拼接,比较不好理解。

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        webSocketSession.getHandshakeInfo();
        Mono<Void> in = webSocketSession
                .receive()
                .doOnNext(webSocketMessage -> {
                    String text = webSocketMessage.getPayloadAsText();
                    LOGGER.info("WebSocket msg:{}", text);
                })
                .then();

        Mono<Void> out = webSocketSession.send(webSocketMessagePublisher(webSocketSession));
        return Mono.zip(in, out).then();
    }

handle方法传进来的是个WebSocketSession对象,调用receive()方法返回的是一个Flux<WebSocketMessage>对象,我们该如何理解这个Flux对象呢?

可以理解为一个执行序列的上游信息,我们可以在其基础之上添加相应的执行单元,因为这是个WebSocketSession,所以其实是个热序列(会不断生成新数据,直到断开链接),所以我们定义的下游执行单元需要指定,当收到新增数据的时候,我们应该执行什么操作,这也是个典型的push模式,这里调用doOnNext()方法传递一个Consumer对象其实是在定义Subscriber的onNext()方法在收到新数据的时候需要执行的操作,用发布订阅的模式去理解这个过程就是,参数传递进来的是个数据发布者,方法内部的程序是数据订阅者,订阅者要向发布者订阅数据,但是不是真正的订阅,而只是提供一个订阅之后的执行逻辑,因为从整体来说的话,该方法只是其中的一个执行单元,如果在这里调用subscribe方法,那后续再添加任何执行单元就都没有什么用了。所以这里其实是有个原则的。

只有可以确认当前序列不会再有下游信息的时候,才能最终调用Flux#subscribe()方法去执行序列。可能不太好理解,不过慢慢的就会理解了。(对于幂等的序列,可以不用考虑这个原则,例如通过 Mono.just() 或 Flux.just() 等方法创建的序列)**

发布订阅模式(PubSub)

企业微信截图_8cc99873-29b0-4ffb-b698-646fc7ace807.png

序列的开始,是由订阅者调用发布者的subscribe方法,首先publishier会创建一个subscription对象,并回调订阅者的onSubscribe方法,将subscription注入到订阅者的体内,当订阅者需要从发布者那里获取数据的时候,订阅者可以调用Subscription对象的request方法,发布者生成指定数量的数据之后,循环调用subscriber对象的onNext方法用来传递数据,当发布的过程产生错误,发布者会调用订阅者的onError方法来传递错误,当发布者发布完成,会调用订阅者onComplete方法代表发布完成了,之后,发布者将不会再次发布数据,这个流程就结束了。

以上是标准的冷序列的调用直径流程。热序列和这个流程有点不一样并且不太容易理解,所以我们到后续的章节再单独进行说明。

发布订阅模式可能更多的是应用于进程之间或者线程之间的交互,其中通过阻塞队列或相应的消息中间件去实现消息的容器,但是在响应式编程当中,该模式的主要作用,就是进行连接,将数据从一个组件传递到另外一个组件,而且默认是没有像阻塞队列那样的数据容器,除非手动指定buffer模式,后边介绍各种Operator的时候会说到buffer模式。

以发布订阅模式的出发点去思考的话,应对理解响应式编程有帮助。

一个数据提供者对象,都能提供哪些数据,这些数据通过什么方式传递?

一个可执行命令的对象,都能提供哪些命令的执行?这些命令什么时候去执行,我能否先获得所有需要执行的命令,再根据一些规则,去定制他们的执行顺序?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容