reactive stream 响应式流

1 初识Reactive Stream

反应式流

2015 年反应式流 (Reactive Stream) 规范诞生,定义了如下四个接口:

  • Subscription 接口定义了连接发布者和订阅者的方法 - Publisher<T> 接口定义了发布者的方法 - Subscriber<T> 接口定义了订阅者的方法 - Processor<T,R> 接口定义了处理器

Reactive Stream(以下简称RS) 规范诞生后,RxJava 从 RxJava 2 开始实现 RS 规范

下图展示了订阅者与发布者交互的典型场景:
image
image

RS 基于流进行处理可以更高效地使用内存,把业务逻辑从模板代码中抽离出来,把代码从并发、同步问题中解脱出来,同时还可以提高代码的可读性。

RS 在某些方面是迭代器模式和观察者模式的结合,同时存在数据的 Pull 和 Push。 订阅者先请求 N 个项目,然后发布者推送最多 N 个项目给订阅者。
image
image

Java 9 中的 Flow 类定义了反应式编程的API。 实际上就是拷贝了 RS 的四个接口定义,然后放在 java.util.concurrent.Flow 类中。 Java 9 提供了 SubmissionPublisherConsumerSubscriber 两个默认实现。

Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。 相比之下

  • Stream 更侧重于流的过滤、映射、整合、收集 - 而 Flow 更侧重于流的产生与消费(下面的代码基于JDK11)

(1) 订阅 Subscription

Subscription 用于连接 Publisher 和 Subscriber。 Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。 Subscription 主要有两个方法:

  • request:订阅者调用此方法请求数据 - cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系

public static interface Flow.Subscription { public void request(long n); public void cancel(); }

(2) 发布者 Publisher

Publisher 将数据流发布给注册的 Subscriber。 它通常使用 Executor 异步发布项目给订阅者。 Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。

  • subscribe:订阅者订阅发布者

@FunctionalInterface public static interface Flow.Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }

(3) 订阅者 Subscriber

Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。 对于给定订阅(Subscription),调用 Subscriber 的方法是严格按顺序的。

  • onSubscribe:发布者调用订阅者的这个方法来异步传递订阅 - onNext:发布者调用这个方法传递数据给订阅者 - onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法 - onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法

public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }

(4) 处理器 Processor

Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。

(5) 背压 back pressure

Subscriber 向 Publisher 请求消息,并通过提供的回调方法被激活调用。如果 Publisher 的处理能力比 Subscriber 强得多,需要有一种机制使得 Subscriber 可以通知 Publisher 降低生产速度。Publisher 实现这种功能的机制被称为背压。提供数据生产者和消费者的消息机制,协调它们之间的产销失衡的情况。 Java 9 中的 Flow API 没有提供任何 API 来发信号或者处理背压,需要开发者自行处理背压。jdk 官方建议参考 RxJava 的背压处理方式。

(6) 事件顺序

反应式流中的事件顺序: a.创建发布者和订阅者,分别是 Publisher 和 Subscriber 的实例 b.订阅者调用发布者的 subscribe 进行订阅 c.发布者调用订阅者的 onSubscribe 传递订阅 Subscription d.订阅者调用 Subscription 的 request 方法请求数据 e.发布者调用订阅者的 onNext 方法传递数据给订阅者 f.数据传递完成后发布者调用订阅者的 onComplete 方法通知完成

参考

反应式流 - Reactive Stream

</pre>

1 初识Reactive Stream

反应式流

2015 年反应式流 (Reactive Stream) 规范诞生,定义了如下四个接口:

  • Subscription 接口定义了连接发布者和订阅者的方法
  • Publisher 接口定义了发布者的方法
  • Subscriber 接口定义了订阅者的方法
  • Processor<T,R> 接口定义了处理器

Reactive Stream(以下简称RS) 规范诞生后,RxJava 从 RxJava 2 开始实现 RS 规范

下图展示了订阅者与发布者交互的典型场景:
image

RS 基于流进行处理可以更高效地使用内存,把业务逻辑从模板代码中抽离出来,把代码从并发、同步问题中解脱出来,同时还可以提高代码的可读性。

RS 在某些方面是迭代器模式和观察者模式的结合,同时存在数据的 Pull 和 Push。
订阅者先请求 N 个项目,然后发布者推送最多 N 个项目给订阅者。


image

Java 9 中的 Flow 类定义了反应式编程的API。
实际上就是拷贝了 RS 的四个接口定义,然后放在 java.util.concurrent.Flow 类中。
Java 9 提供了 SubmissionPublisherConsumerSubscriber 两个默认实现。

Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。
相比之下

  • Stream 更侧重于流的过滤、映射、整合、收集
  • 而 Flow 更侧重于流的产生与消费(下面的代码基于JDK11)

(1) 订阅 Subscription

Subscription 用于连接 Publisher 和 Subscriber。
Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。
Subscription 主要有两个方法:

  • request:订阅者调用此方法请求数据
  • cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系
public static interface Flow.Subscription {
    public void request(long n);
    public void cancel();
}

(2) 发布者 Publisher

Publisher 将数据流发布给注册的 Subscriber。
它通常使用 Executor 异步发布项目给订阅者。
Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。

  • subscribe:订阅者订阅发布者
@FunctionalInterface
public static interface Flow.Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

(3) 订阅者 Subscriber

Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。
对于给定订阅(Subscription),调用 Subscriber 的方法是严格按顺序的。

  • onSubscribe:发布者调用订阅者的这个方法来异步传递订阅
  • onNext:发布者调用这个方法传递数据给订阅者
  • onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法
  • onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法
public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

(4) 处理器 Processor

Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。

(5) 背压 back pressure

Subscriber 向 Publisher 请求消息,并通过提供的回调方法被激活调用。如果 Publisher 的处理能力比 Subscriber 强得多,需要有一种机制使得 Subscriber 可以通知 Publisher 降低生产速度。Publisher 实现这种功能的机制被称为背压。提供数据生产者和消费者的消息机制,协调它们之间的产销失衡的情况。 Java 9 中的 Flow API 没有提供任何 API 来发信号或者处理背压,需要开发者自行处理背压。jdk 官方建议参考 RxJava 的背压处理方式。

(6) 事件顺序

反应式流中的事件顺序: a.创建发布者和订阅者,分别是 Publisher 和 Subscriber 的实例 b.订阅者调用发布者的 subscribe 进行订阅 c.发布者调用订阅者的 onSubscribe 传递订阅 Subscription d.订阅者调用 Subscription 的 request 方法请求数据 e.发布者调用订阅者的 onNext 方法传递数据给订阅者 f.数据传递完成后发布者调用订阅者的 onComplete 方法通知完成

参考

反应式流 - Reactive Stream

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容

  • 1. 我们总习惯向陌生人袒露心声,对身边的人谨言慎行。要害,往往握在平时最亲近的人手中 。 2. 总能和萍水相逢的...
    由榔阅读 196评论 0 0
  • part1 「喂,那个,珊儿,妳将这封信给三年级的学长,若完成任务,今天晚上我请你吃牛排大餐。」 宋眉瞪着圆圆的,...
    伊伊8899阅读 960评论 14 11
  • 在AppStore中的应用越来越重视动画效果的使用,一个良好动画效果可以让两个状态之间平滑地过度,也可以利用动画吸...
    Devil雅馨阅读 3,529评论 16 32