异步编程三:reactor模式

书接上回,我们一起体验了promise模式,也了解到了其解决什么场景下的问题。
本篇文章的目的之一即回答好两个问题:

  • reactor模式解决什么场景下的问题
  • reactor解决问题的场景与promise模式有哪些不同,有哪些重叠

另外在概念层面,本篇文章希望能够解释清楚reactor领域一些常见的概念
实践层面,带读者体验一下reactor模式的写法,抛砖引玉

案例实现

还是promise文章里的案例:基于计算器服务,实现一个接口,接口实现计算 a + ((b -c)+ d) -e -f + g
本文实现依然是基于vertx和kotlin来做,只是异步结果编排部分替换为reactor实现:

class ReactorVerticle : AbstractVerticle(){

    lateinit var webClient: WebClient

    override fun start() {
        webClient = WebClient.create(vertx)
        var eventBus = vertx.eventBus()

        eventBus.consumer<JsonObject>("calc.reactor"){ msg ->
            var msgBody = msg.body()
            var a = msgBody.getInteger("a", 0)
            var b = msgBody.getInteger("b", 0)
            var c = msgBody.getInteger("c", 0)
            var d = msgBody.getInteger("d", 0)
            var e = msgBody.getInteger("e", 0)
            var f = msgBody.getInteger("f", 0)
            var g = msgBody.getInteger("g", 0)

            (b asyncSub c)
                .flatMap { it asyncAdd d }
                .flatMap { it asyncAdd a }
                .flatMap { it asyncSub e }
                .flatMap { it asyncSub f }
                .flatMap { it asyncAdd g }
                .doOnError {  msg.fail(500, it.message) }
                .subscribe { msg.reply(it.toString()) }

        }

    }

    infix fun Int.asyncAdd(input : Int) : Mono<Int> {
        return calc(this, input, CalcOperator.add)
    }

    infix fun Int.asyncSub(input : Int) : Mono<Int> {
        return calc(this, input, CalcOperator.sub)
    }

    /**
     * 所有异常必须被处理
     */
    fun calc(a: Int, b: Int, operator: CalcOperator) : Mono<Int> {

        return Mono.create { sink ->
            webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b")
                .expect(ResponsePredicate.SC_OK).send{
                    if (it.succeeded()) {
                        try{
                            var addResult = it.result().bodyAsString().toInt()
                            sink.success(addResult)
                            println("reactor calc: $a - $b = $addResult")
                        } catch (e: Exception) {
                            sink.error(e)
                        }
                    } else {
                        sink.error(it.cause())
                    }
                }
        }
    }
}

与promise模式实现的代码风格很像,在当前案例这种场景下可以说reactor模式可以做到和promise模式一样的效果。
但是reactor不仅仅解决promise的场景。

案例变种一

实现计算 a + ((b -c)+ d) -e -f + g,当 b-c > 5 时取b-c的结果,否则以6作为b-c的结果
以reactor模式实现代码如下:

(b asyncSub c)
                .filter{ it > 5 }
                .switchIfEmpty(Mono.just(6))
                .flatMap { it asyncAdd d }
                .flatMap { it asyncAdd a }
                .flatMap { it asyncSub e }
                .flatMap { it asyncSub f }
                .flatMap { it asyncAdd g }
                .doOnError {  msg.fail(500, it.message) }
                .subscribe { msg.reply(it.toString()) }

当然以promise模式来做的话,代码优化一下,应该也不会太丑,比如:

(b asyncSub c).thenCompose {
    var promise = CompletableFuture<Int>()
    if(it > 5){
        promise.complete(it)
    } else {
        promise.complete(6)
    }
    promise
}
.thenCompose { it asyncAdd d }
.thenCompose { it asyncAdd a }
.thenCompose { it asyncSub e }
.thenCompose { it asyncSub f }
.thenCompose { it asyncAdd g }
.thenAccept { msg.reply(it.toString()) }
.exceptionally {
    msg.fail(500, it.message)
    null
}

但是上面用到的filter和switchIfEmpty组合只是reactor里的两个操作符号而已,reactor里还有很多很丰富的各种操作符

最初的问题

下面来解答最开始的两个问题

  • reactor模式解决什么场景下的问题
  • reactor解决问题的场景与promise模式有哪些不同,有哪些重叠

reactor可以解决promise场景下的问题,而且解决方案更加优雅,并形成标准;有很多现成的轮子(各种操作符)拿来即用。但reactor不只是为了解决promise面对的问题的,他解决的问题笔者归纳如下(个人观点,欢迎讨论):

  • reactor是一种编程模式,与面向对象编程是一个级别的,他的存在是为了解决具体的某一类问题,但又不是解决特定问题的(能简单说一下面向对象编程解决什么问题吗?);简单来说reactor模式关注的是对数据的处理,把一坨数据通过各种操作符号转换为另一坨数据;数据在处理过程中又支持各种异步处理,支持对异步处理的结果进行编排,同时编程风格上推崇链式,所以看上去代码干净一些
  • 上面一点里的支持对异步结果的编排顺手解决异步编程模式下,的回调地狱问题(与promise一样)

说法有点绕,reactor和promise本身是两个东西,promise顶多算一种设计模式,而reactor是一种编程风格;因为本系列文章从异步编程角度谈起,所以故事一路讲来把reactor和promise进行了一轮比较。

下面顺便引用两处权威文档:

asynchronous stream processing with non-blocking back pressure
-- https://www.reactive-streams.org/

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
-- https://en.wikipedia.org/wiki/Reactive_programming

reactor核心概念串讲

下面用两张图来讲一下reactor模式;第一张图,两个工人,左边的是publisher、右边的是subscriber,中间是一个流水线, 流水线上有四道工序:

  • 变蓝色
  • 变三角形
  • 过滤尺寸太大的长方形
  • 只要前九个


    image.png

    那么经过如上的流水线,最终结果会是什么样子呢?


    image.png

    其中流水线中每一道工序都可以是异步的,那么流水线相当于对异步处理的结果按照预先设定好的逻辑进行了编排。
    下面把图中的角色对应到reactor模式里去:
  • publisher: publisher
  • subscriber: subscriber
  • operator: 各个工序都属于operator

reactive stream

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure
-- https://www.reactive-streams.org/

reactor stream 定义了一套 接口project reactor 实现了这套接口并进行了一些拓展

所以基于reactor开发的代码,各种方法返回的Mono或者Flux都是属于publisher,publisher会被调用者subscribe到具体的subscriber,这个过程就是搭建流水线的过程。如果有0到1个货物要处理就用Mono;如果有0到n个货物要处理就用Flux。
这里只大概讲一下概念,更深入的内容建议熟读五遍 projectreactor文档,这个文档里会介绍为什么要用reactor,如何使用,如何选择操作符等各种问题。

reactor模式与http服务

java生态这么多年一路走来,做web服务,最原始大部分基于servlet模型,都是阻塞式io来做,与异步编程扯不上关系。随着时代的发展,对性能要求越来越高,高并发的web服务似乎是java的一个软肋。但基于异步编程,也衍生出了一些解决方案,可以与reactor模式一起玩的,列举常用的三种:

  • spring webflux (听这名字 web的flux!)
  • vertx 上面的示例代码就是基于vertx来写的
  • reactor netty,这个笔者并没有体验过

上面的三个每一个都可以单独列一个话题谈好多,暂不展开

缺点

reactor模式虽然看起来狂拽酷炫,漫不经心就把回调地狱给解决了,但是在复杂业务场景下,比如各种...if...else...嵌套的场景,如果想要代码保持清晰性、可读性,是非常考验设计功底和编码功底的,当然对笔者来说这一点亦是异步编程的魅力所在。
对程序员个人来讲,多接触一些思想和模式,有助于开阔思路,融会贯通;
但是对于公司来讲,做项目讲究性价比,采用入门门槛高的编程方式,就需要招能力强的程序员,反之只需要招一些刚刚毕业的实习生即可。
异步编程,我们从回调、promise聊到reactor,有没有其他可选方案呢?

协程,支持以同步的方式写异步的代码

笔者认为这也是最近这几年golang火起来的一个主要原因。
另外jvm体系kotlin也支持协程,与golang的协程在玩法上大不相同。
下一篇文章,我们来一起聊一聊协程。

参考文章

知乎上关于函数式编程的讨论
ReactiveX 文档翻译
projectreactor文档

系列文章快速导航:
异步编程一:异步编程的魅力
异步编程二:promise模式
异步编程三:reactor模式
异步编程四:协程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容