概述
RxJava是响应式编程(Reactive Extensions)在JVM平台上的实现,即用java语言实现的一套基于观察者模式的异步编程接口,目前有1.x版本和2.x版本两套实现。RxJave2即RxJave库的2.x版本,这篇文章即介绍这个版本。
背景介绍
RxJava的1.x版本和2.x版本是不兼容的两个库,托管在不同分支上也使用不同的包名,官方指出1.x版本在2017年6月1日开始停止添加新内容,保持修改bug的状态,并在2018年3月31日停止开发。
RxJava的2.0.1版本于2016年11月12日正式发布,和1.x版本并行开发。RxJave2按照Reactive-Streams标准对接口进行了重写,继续支持java 6+和Android 2.3+,性能得到提升,更方便使用java8的lambda表达式,并把1.x中的背压功能单独分离出来。
在功能使用方面,2.x和1.x是大同小异的,大部分操作符都没变,实现原理也是相同的,所以下面会偏重介绍两个版本的不同点,关于RxJave的一些共性的基础的介绍可以参考本系列文章1.x版本部分。
简单使用
2.x版本的包名已经改变,maven库的gradle引用为:
compile 'io.reactivex.rxjava2:rxjava:x.y.z'
在1.x版本,被观察者的角色主要使用Observable,在2.x版本中增加了Flowable和Maybe,Flowable是2.x版本负责背压处理的被观察者,Observable中不再支持背压功能。
Flowable的简单使用代码如下:
Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.noNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}, FlowableEmitter.BackpressureMode.BUFFER);
Disposable disposable = flowable
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("RX", integer.toString());
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);//设置请求数,不设置默认也是这个数
}
@Override
public void onNext(Integer integer) {
Log.d("RX", integer.toString());
}
@Override
public void onError(Throwable t) {
Log.d("RX", t.toString());
}
@Override
public void onComplete() {
Log.d("RX", "Complete");
}
});
我们可以看到create函数多了个参数,代表背压策略,背压策略后续会有文章详细讲解;subscribe函数的参数从Subscriber变成了FlowableEmitter;1.x版本的Action不见了,我们用的是Consumer;Subscriber其实也是用的Reactive-Streams规范中的Subscriber,比1.x多了onSubscribe函数,这个函数传入的参数Subscription是很重要的,必须调用它的request方法,下层才能收到事件通知,当然,为了使用方便,这个方法在框架中是有默认调用的,设置默认请求数为Long.MAX_VALUE。从1.x到2.x细节变化还是不少的,但总体来看,使用起来跟1.x还是差不多,用过1.x的应该可以很快适应。
Reactive-Streams简介
上面说到,2.x版本是根据Reactive-Streams实现的,所以再简单介绍下Reactive-Streams。
Reactive Streams是一个基于JVM的面向流的库包的标准和规范,具体如下:
1.处理潜在的无边界限制的元素
2.顺序
3.在组件之间异步传递元素
4.使用强制性的非堵塞的抗压。
Reactive Streams由以下部分组成:
SPI:定义了不同实现组件之间的交互层和互操作性
API:定义 Reactive Streams用户使用的类型
技术兼容Kit(TCK):实现的标准测试。
Reactive Streams是一个规范,构建系统的开发者们需要使用该规范的实现。Reactive Streams的目标是增加抽象层,而不是进行底层的流处理,规范将这些问题留给了库实现来解决。
具体使用Reactive Streams比较简单,只需实现几个接口即可,就像上面说的,Reactive Streams的目标是增加抽象层,而不是进行底层的流处理,每个具体流处理库可能都有自己的实现方式,但是只要大家都遵循了这个规范,那就可以无缝切换了。
Reactive Streams规范的一个主要目标是通过异步边界来解耦系统组件。在同步世界中,每个功能或是操作都是按照顺序处理的,一个接着一个;除了第一个外,其他每一个操作都依赖于前一个操作。这种方式增加了维护成本,并且不利于构建出响应式系统。
Reactive Streams规范的另一个主要目标是为压力处理定义一种模型。流处理的理想范式是将数据从发布者推送到订阅者,这样发布者就可以快速发布数据,同时通过压力处理来确保速度更快的发布者不会对速度较慢的订阅者造成过载。压力处理通过使用流控制来确保操作的稳定性并能实现优雅降级,从而提供弹性能力。
通过上面介绍,大家对Reactive Streams可能有了大概了解,这些大多是参考自一些权威写的文章,内容偏向整体的抽象概述,具体可以参考Reactive Streams的接口代码和实现规范的详情,这些都在GitHub中给出了。
2.x与1.x差异概述
一些差异前面已经有所提及,这里再总结一下:
- maven地址和项目包名不同
- RxJave2不再接受参数为null的数据流,这么做会马上抛出NullPointerException
- Observable和Flowable分别处理无背压和有背压功能部分
- 新增Maybe类型,可以看作Single和Completable的合体类
- 完全遵循Reactive Streams接口规范
- 方法都可以抛出异常
- Action系列接口被Consumer取代
- Func系列接口被重命名为Function系列
- 调用Subscription的request方法时就会调用noNext等周期函数,所以要把一些后面需要用到的初始化操作放在request函数调用之前。
- 少量操作符差异
更详细内容可以参考官网
参考文章:
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0907/6604.html
http://www.infoq.com/cn/news/2015/12/reactive-streams-introduction