Java9特性-响应式流(Reactive Stream)

什么是流

形象的比喻来说就是如同水一样绵绵不绝的数据形式。而抽象点来说,是有一个生产者(source)产生,由一个或者多个消费者(sink)消费的数据元素(item)序列。那从这个抽象的描述就可以看出,使用流来承担数据交互的模式就是咱们经常说的生产者/消费者模型,而这种模型也可以称之为发布者/订阅者模型(后文将使用这个名字,因为JDK中使用的是这个名字)。

对于流数据来说,一般有两种的数据流转方式:

  • 拉(pull)数据模式:订阅者向发布者索要数据。
  • 推(push)数据模式:发布者向订阅者推送数据(push)。

这两种模式都是描述的单次信息传递的方式。如果发布者产生信息的速度和订阅者消费信息的速度一致的话,那这两种方法都将是十分有效的数据流转方式。

流有什么问题

流的问题在于当两端的速度不匹配的时候(考虑一下各种mq主要处理的问题削峰平谷)。而速度的不匹配自然存在以下两种情况:

订阅者消费速度快

这种情况的时候会出现订阅者有处理能力了,但是订阅者无信息可以处理的情况。如果这种时候是同步的调用模式,则订阅者将会阻塞,直到有新的信息可以进行处理。而如果这时候是异步的信息处理模式,则订阅者可以在无消息处理的时候挂起,直接切换到其他的任务处理中(对于多核CPU的多线程来说)。也就是说,对于这种情况,比较理想的是异步推模式

发布者发布速度快

当发布者发布速度快的时候,会发生订阅者来不及处理数据的情况。如果是同步的情况下发布者会一直阻塞,而如果是异步模式则对于订阅者来说有两种处理方式(可以类比一下线程池设计)可以处理:

  • 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下)
  • 不损失数据:加入队列缓存数据(订阅者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)

而还有另一种需要发布者加入的处理方式叫做背压(backpressure)。背压的实现方式是:由订阅者发出信号,让发布者降低信息的发布速度,从而让信息速度之间匹配。背压的优点是同样可以处理信息流速不一致问题。而更有意思的是,这时候信息的处理策略可以由发布者来选择:

  • 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下)
  • 不损失数据:加入队列缓存数据(发布者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)

没错,这两种情况是和订阅者一致的,不过选择权则由订阅者变成了发布者。

也就是说,在发布者发布速度快的时候,要么发布者直接同步阻塞,要么可以先根据消息的主要关心方(是发布者还是订阅者)来确定是否使用背压,然后再根据数据的类型判断是否接受数据丢弃(不丢弃可能会导致系统崩溃)。往往我们的发布者可以由上层的mq或者程序的应答机制保护消息的可用性。

那么结论是什么,我们需要异步非阻塞(订阅者消费快)、以及背压(发布者发布快)。

什么是响应式流

Reactive Streams 是一项非阻塞背压的异步流处理标准的倡议,当然,如果我这个翻译看的不清楚的话就还是看原文吧(http://www.reactive-streams.org/)。

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

响应式流(Reactive Streams)概念被提出是在2013年,旨在处理上一小节中由于流速问题而产生的几种问题:订阅者订阅者的阻塞、由订阅者(数据下游)来选择是依赖无限队列(数据不丢)或直接丢弃数据。

而对于一项标准而言,它的目是自然是用更少的协议来描述交互。而响应式流的模型也是十分简单:

  • 订阅者异步的向发布者请求N个元素。
  • 发布者一步的向订阅者发送M(0<M<=N)个元素。

基于这个模型,响应式流可以做到pull模型和push模型流处理机制之间动态切换。当订阅者较慢时,发送者没有新数量的请求则发布者进入等待请求信息的状态,相当于pull模型;当订阅者更快时,相当于发布者没有新的信息,订阅者进入到等待消息发送的状态相当于push模型。

Java中的响应式流

对于响应式流,在2015年的时候确定了关于其Java API,具体的详情也也可以参考上面的链接。其中定义了4个API,具体为:

  • Publisher<T>
  • Subscriber<T>
  • Subscription
  • Processor<T,R>

对他们的定义为:

Publisher(发布者)

是一个假定上游会产生无限数据的信息发布者。他们会向有发送请求的订阅者推送元素。

Subscriber(订阅者)

订阅者会从发布者那里领取令牌,然后根据令牌向发布者发送“获取请求”。同时当发布者部分准备好元素的时候,会通过令牌对订阅者进行调用,进行数据消费。

Subscription(令牌)

发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。

Processor(处理器)

可以通过处理器连接发布者、订阅者以及其他处理器。Processor本身同时继承了Publisher与Subscriber接口,所以可以对元素进行处理转发。主要用于让数据从T转换为R。同时,由于Processor本身也可以接入Processor,所以Processor可以组成链来对数据进行处理。

一次完整的调用流程大概可以描述为:

  1. 订阅者向发布者发送订阅请求。
  2. 发布者根据订阅请求生成令牌发送给订阅者。
  3. 订阅者根据令牌向发布者发送请求N个数据。
  4. 发送者根据订阅者的请求数量返回M(M<=N)个数据
  5. 重复3,4
  6. 数据发送完毕后由发布者发送给订阅者结束信号

而Java API中的接口如下所示,其中所有的方法都是void,因为所有的方法都是异步执行的。

public interface Publisher<T> {
    //用于1.中订阅请求
    public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
    //用于2.中回调发送令牌
    public void onSubscribe(Subscription s);
    //用于3.用于接受4中发送过来的数据
    public void onNext(T t);
    //用于3,4,5接收中间异常了之后的调用
    public void onError(Throwable t);
    //用于6.中结束信号的回调
    public void onComplete();
}
public interface Subscription {
    //用于3.的发送请求N个数据
    public void request(long n);
    //用于3,4,5订阅者异步的向
    public void cancel();
}
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

JDK中的响应式流

Java API中的流程使用方式看起来比较简单,但API背后的具体实现由于是全异步交互以及涉及具体背压处理而很困难。而JDK9中为用户提供了Publisher接口的简单实现,让开发人员可以基于此来扩展出自己的实际需求。

JDK 9中的响应式流功能提供在java.util.concurrent包下,全响应式流的API接口被封装到 Flow接口中,其中包括需要使用的接口以及静态方法,关于上一小节中接口方法的详细描述也可以参见该接口上的方法描述。其中的静态接口为:

Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription

除去上一小节说的4个接口外,Flow中还包含了一个默认方法defaultBufferSize(),用于返回默认的令牌中的缓冲区大小,而默认的值为DEFAULT_BUFFER_SIZE = 256

除去Flow外,其中还有一个刚刚说到的Publisher的简单实现类SubmissionPublisher<T>。该接口在实现了publisher<T>之外还实现了AutoCloseable接口,所以可以直接用try块来进行资源的管理。

尽管JDK 9中没有提供Subscriber<T>的简单实现,但是在SubmissionPublisher<T>中提供了一个consume(Consumer<? super T> consumer)方法,用于让开发人员可以直接消费消息发布者的所有元素。实际上是在内部实现了简单的SubscriberConsumerSubscriber,但是并不是public的,所以不能直接使用。

简单的例子

根据JDK 9中提供的SubmissionPublisher<T>咱们来写一个小例子。

    public static void main(String[] args) {
        // 用于承接返回值的任务
        CompletableFuture<Void> task;
        // try-with-resource来控制资源
        try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
            System.out.println("默认缓冲容量: " + publisher.getMaxBufferCapacity());
            // 传入打印方法来处理元素
            task = publisher.consume(System.out::println);
            // 打印数字,调用发布者进行信息处理
            IntStream.range(1, 6)
                    .forEach(publisher::submit);
        }
        if (task != null) {
            try {
                // 当所有订阅者处理完毕后调用
                task.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

在这个例子里面进行了以下几件事。

  1. 声明一个CompletableFuture用于捕获后续的处理事件。
  2. 开启资源用于进行流消息订阅
  3. 设置流的订阅方法(订阅者)
  4. 进行发布者的信息发送
  5. 阻塞主方法等待处理完毕后结束

其中pub.getMaxBufferCapacity()会打印默认的缓存空间256。在调用publisher.consume的时候,是奖传入的Consumer在内部封装成一个Subscribr的简单实现类,用于订阅信息的发送,实时上后续数据的订阅者就是在这步创建的。

当publisher进行调用的时候,调用submit发送数据,publisher有两个方法用于发送数据,一个是submit,一个是offer。两个方法下面实际都是调用的doOffer方法,所以,offer方法提供了置顶延迟时间后丢弃的策略,而submit是offer的简单实现,是一致阻塞不丢弃。

最后

不得不说响应式流是java中响应式编程的基础,而JDK 9中也提供了Reactive Streams的“简单”实现。之所示简单是打引号的是因为实际上还有点绕的,有兴趣的同学可以追一下SubmissionPublisher<T>的实现,有一些思想的经典实现,比如用整数中的7位来作为状态机。在下一篇中我们再聊一下JDK 9中的数据交互顺序。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容