响应式编程入门之 Project Reactor

本文目标

  • 理解响应式编程

前言

之前的《聊聊 IO 多路复用》中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)

本文以 Reactor 为例带大家入门响应式编程

版本

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.4.6</version>
    </dependency>

什么是 Reactor

Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。
在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。

上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。

回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。

从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?

// 以下案例来自 Reactor 官网
userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?

// 以下案例来自 Reactor 官网
userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);

可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思 😂

响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,它也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。

响应式编程内幕

Reactor 实现了 org.reactivestreams 提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。

下图展示了 reactivestreams 中的核心接口

reactivestreams 核心接口
  • Publisher:发布者

  • Subscriber:订阅者

  • Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介

  • Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)

在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的

响应式编程执行逻辑

在 Reactor 中大部分实现都是按照上图的逻辑来执行的

  1. 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
  2. Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
  3. Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
  4. Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
  5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError

调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布

其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
 * @author tianwen.yin
 */
public class SimpleReactiveStream {

    /**
     * 实现一个简单的响应式编程发布者
     * 逻辑:当订阅者发起订阅时,像下游发送一个 HelloWorld,发布逻辑由 SimpleSubscription 完成
     */
    static class SimplePublisher implements Publisher {
        @Override
        public void subscribe(Subscriber s) {
            // 2. Publisher 发布数据之前,调用 Subscriber 的 onSubscribe
            s.onSubscribe(new SimpleSubscription(data(), s));
        }

        private String data() {
            return "Hello World";
        }
    }

    static class SimpleSubscriber implements Subscriber {
        @Override
        public void onSubscribe(Subscription s) {
            // 3. Subscriber 通过 Subscription#request 来请求数据
            // 或者 Subscription#cancel 来取消数据发布
            s.request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(Object o) {
            System.out.println(o);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("error");
        }

        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    }

    static class SimpleSubscription implements Subscription {
        String data;
        Subscriber actual;
        boolean isCanceled;

        public SimpleSubscription(String data, Subscriber actual) {
            this.data = data;
            this.actual = actual;
        }

        @Override
        public void request(long n) {
            if (!isCanceled) {
                try {
                    // 4. Subscription 在接收到订阅者的调用后
                    // 通过 Subscriber#onNext 向下游订阅者传递数据
                    actual.onNext(data);
                    // 5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流
                    actual.onComplete();
                } catch (Exception e) {
                    // 如果数据发布或者处理遇到错误会调用 Subscriber#onError
                    actual.onError(e);
                }
            }
        }

        @Override
        public void cancel() {
            isCanceled = true;
        }
    }

    public static void main(String[] args) {
        // 1. Subscriber ”订阅“ Publisher
        new SimplePublisher().subscribe(new SimpleSubscriber());
    }

}

响应式编程思想

响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。

image.png

这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。

Reactor 中的 Operator

Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)

    Mono.just("hello")
            .map(a -> a + "world")
            .subscribe(System.out::println);

举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码

    // 注意,这是我基于 Reactor API 实现的伪代码!
    public static class MonoMap implements Publisher {
        // 我们自定义的转换逻辑
        private Function mapper;
        // source 代表当前操作符的上游发布者
        private Publisher source;

        public MonoMap(Publisher source, Function mapper) {
            this.source = source;
            this.mapper = mapper;
        }

        @Override
        public void subscribe(Subscriber actual) {
            source.subscribe(new MonoMapSubscriber(mapper, actual));
        }
    }

    public static class MonoMapSubscriber implements Subscriber {
        // 我们自定义的转换逻辑
        private Function mapper;
        // 真正的下游
        private Subscriber actual;

        public MonoMapSubscriber(Function mapper, Subscriber actual) {
            this.mapper = mapper;
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Subscription s) {
            actual.onSubscribe(s);
        }

        @Override
        public void onNext(Object o) {
            // 当上游数据发送过来时,先进行转换再发送给下游
            Object result = mapper.apply(o);
            actual.onNext(result);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }
    }

上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨

我们首先来分析一下 Mono.just("hello").map(a -> a + "world") 这句话

  1. 当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"

  2. 当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)

    • 当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)

    • 当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...

最后通过一张图来总结一下

Operator 实现原理

Reactor 该如何学习

本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的

如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。

最后

如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容