RxJava是什么
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.这是官方对于该库的描述,意思是RxJava是Java VM上一个灵活的,使用可观测序列组成的异步的、基于事件的库。 它的核心最主要在于它的“异步”, Observable(被观察者)与 Observer/Subscriber(观察者),Observable可发出一系列事件,事件执行完毕后回调被观察者,这里的事件可以有非常多种形式,例如:网络请求、文件操作、数据加载、循环、延时操作等等。
相关概念
同步/异步: 关注的是消息通信机制,同步是指 发出一个调用,在没有得到结果之前,该调用就不返回,但是一旦调用返回,就得到返回值了; 而异步是指 调用发出后,调用直接返回,但不会立刻得到调用的结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用; 异步强调被动通知。
阻塞/非阻塞:关注的是程序在等待调用结果(消息,返回值)时的状态;阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。如java8 stream为阻塞式,Future为非阻塞式的; 非阻塞强调状态主动轮询。
并发(Concurrency)与并行(parallelism): 并发是一个更通用的概念,两个并发线程被调度在一个单核处理上运行,这就是并发性,而非并行. 并行性更可能出现在多核,多CPU或分布式系统上。编写代码时一般不区分两者。
函数式编程Functional Programming):函数式编程将程序描述为表达式和变换,以数学方程的形式建立模型,并尽量避免可变的状态。每个逻辑分类(filter,map,reduce等)都由不同函数所表示,这些实现底层次的变换,而用户定义的高阶函数作为参数来实现真正的业务。
函数响应式编程(Functional Reactive Programming)):响应式编程是建立在观者者模式上的一种编程范式,对异步数据流进行编程,同时该事件流是按时间排序的序列(不同于java8中的stream);虽然响应式编程框架不一定要求是函数式的,但是RxJava等响应式编程框架都是结合了函数式编程的。
背压(Backpressure):背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略,简而言之,背压是流速控制的一种策略。
ReactiveX是什么
响应式扩展(Reactive Extensions, 简写为ReactiveX,Rx),最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源[1];
ReactiveX = Observer Pattern + Iterator Pattern + Functional Programming。Rx 让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序;
ReactiveX是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,ReactiveX的思想是跨平台的,学习其他语言的基本语法之后,可以做到 “learn once, write everywhere”;
Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
RxJava 是 ReactiveX 规范的JVM平台实现
组织结构
RxJava核心由以下四个interface定义:
Publisher : 消息发布者
Subscriber : 消息订阅者
Subscription : 一个订阅
Processor : Publisher + Subscriber 的结合体
它有什么作用
1.使你的代码更加简洁,增强可读性
2.对事件的处理具有非常好的灵活性
3.非常方便的实现响应式编程
4.对线程与并发问题的处理变得简单
5.Flow Control(Backpressure、Throttling等)
基本使用
引入RxJava依赖,这里使用的2.x版本
repositories {
maven { url 'https://oss.jfrog.org/libs-snapshot' }
}
dependencies {
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
最简单的开始
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onNext("rxjava");
e.onNext("test");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("MainActivity.onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable创建一个基于String类型的事件,然后被观察者调用subscribe方法订阅观察者。What?是不是搞反了。
ObservableEmitter为事件发射器,可调用onNext、onError、onComplete方法,Disposable用于解除订阅。
事件的执行内容是:依次输出“hello”,“rxjava”,“test”三个字符串。这里使用的方法链的方式,当然你也可以采用以下的方式
create方法是Observable的静态方法,用于构造一个Observable对象,如果说用于单纯的构造初始事件的话,除了create以外,还有许多方式可以构造观察者
interval & intervalRange & just & fromIterable & fromArray & timer
他们与create的差异主要在于,create创建的事件需要执行过程,而这些方法创建的事件显示是现成的,不需要额外操作,一旦与观察者关联后即可发送事件。
fromFuture & fromCallable 了解即可,Android中并不太适用
fromFuture, 事件从非主线程中产生; fromCallable, 事件从主线程中产生, 在需要消费时生产;
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<String> callable = new Callable<String>() {...};
Future<String> future = executor.submit(callable);
Consumer<String> onNext = new Consumer<String>() {...};
Flowable.fromCallable(callable).subscribe(onNext);
Flowable.fromFuture(future).subscribe(onNext);
ObservableEmitter和Disposable
ObservableEmitter: ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
1.被观察者可以发送无限个onNext, 观察者也可以接收无限个onNext.
2.当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送, 而Observer收到onComplete事件之后将不再继续接收事件.
3.当Observable发送了一个onError后, Observable中onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件.
4.Observable可以不发送onComplete或onError.
5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
Disposable 调用dispose()方法时, 它就会将观察者和被观察者的联系切断, 从而导致观察者收不到事件。但是并不能控制被观察者是否发射事件。
Observale & Flowable
当然我们也可以使用Flowable,而不用Observale
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onNext("rxjava");
e.onNext("test");
}
}, BackpressureStrategy.BUFFER).subscribe();
Observable: 不支持背压
Flowable : Observable新的实现,支持背压,同时实现Reactive Streams 的 Publisher 接口
其他形式的被观察者
除了上述描述的两种观察者形式外,还有Single & Completable & Maybe
Single
可以发射一个单独onSuccess 或 onError消息。它现在按照Reactive-Streams规范被重新设计,并遵循协议 onSubscribe (onSuccess | onError)
Single.create(new SingleOnSubscribe<Object>() {...)
.subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object value) {
}
@Override
public void onError(Throwable e) {
}
});
Completable
可以发送一个单独的成功或异常的信号,按照Reactive-Streams规范被重新设计,并遵循协议onSubscribe (onComplete | onError)
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
}
}).subscribe(new Action() {
@Override
public void run() throws Exception {
// complete
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// error
}
});
Completable
它是Single 和 Completable 的结合体。它可以发射0个或1个通知或错误的信号, 遵循协议 onSubscribe (onSuccess | onError | onComplete)
Maybe.create(new MaybeOnSubscribe<Object>() {
@Override
public void subscribe(MaybeEmitter<Object> e) throws Exception {
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
// success
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// error
}
}, new Action() {
@Override
public void run() throws Exception {
// complete
}
});
简化订阅
观察者除了用Observer对象外,我们还可以用Consumer单独接收onNext或者onError以及onSubscribe回调,用Action单独接收onComplete,下图是订阅关联的各个重载方法subscribe的参数列表
什么时候用 Observable
一般处理最大不超过1000条数据,并且几乎不会出现内存溢出
如果处理的式同步流而你的Java平台又不支持Java Stream
什么时候用 Flowable
处理以某种方式产生超过10K的元素
有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的
线程调度
默认情况下, Observer和Observable是工作在同一个线程中的, 也就是说Observable在哪个线程工作,就在哪个线程发送事件, Observer也就在同样的线程中接收处理事件
- subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程
- 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效
- 允许多次指定订阅者接收线程,每调observerOn,下游都会进行线程切换
RxJava内置线程选项:
- Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
- Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
- Schedulers.newThread() 代表一个常规的新线程;
- AndroidSchedulers.mainThread() 代表Android的主线程
操作符
RxJava是基于事件序列、异步的操作库,它提供了许多操作符,这些操作符可以在整体事件序列中对元素或者事件本身进行一系列变换,下面为大家介绍一些常用的操作符的使用
concat
将多个Observable按顺序发射数据,不会交错,只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable
Observable observable1 = Observable.fromArray(1,2,3);
Observable observable2 = Observable.fromArray(4,5,6);
Observable.concat(observable1, observable2).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
System.out.println("MainActivity.accept" + o);
}
});
merge
将多个Observable数据合并,各个数据顺序可能交错
Observable observable1 = Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS);
Observable observable2 = Observable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS);
Observable.merge(observable1, observable2).subscribeOn(Schedulers.io()).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
System.out.println("MainActivity.accept" + o);
}
});
map
将一个Observable发射的数据根据某种映射关系转换为另一个形式的数据
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(R.mipmap.ic_launcher);
}
}).map(new Function<Integer, Drawable>() {
@Override
public Drawable apply(@NonNull Integer integer) throws Exception {
return getResources().getDrawable(integer);
}
}).subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable drawable) throws Exception {
img.setImageDrawable(drawable);
}
});
flatMap
flatMap 操作符可以将一个发射数据的 Observable 变换为多个 Observables ,然后将它们发射的数据合并后放到一个单独的 Observable,并且事件有可能交错
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onNext("b");
e.onNext("c");
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
List<String> list = new ArrayList();
list.add(s);
list.add(s);
list.add(s);
return Observable.fromIterable(list).delay(1,TimeUnit.SECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG,s);
}
});
concatMap
作用等同于flatMap,唯一不同的是concatMap事件序列严格按照上流的发送顺序
zip
将多个Observable的事件压缩到一起再发送
Observable name = Observable.fromArray("lee","su","liu");
Observable age = Observable.fromArray(22,31,18);
Observable.zip(name, age, new BiFunction<String,Integer,User>() {
@Override
public User apply(@NonNull String o, @NonNull Integer o2) throws Exception {
return new User(o,o2);
}
}).subscribe(new Consumer<User>() {
@Override
public void accept(User o) throws Exception {
}
});
filter
对上流发射的数据进行筛选,过滤到不符合条件的数据
Observable.fromArray(1,2,3,4,5,6).filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer%2 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"过滤得到的整数" + integer);
}
});
take
事件数量限制,保留指定数量内的事件
Observable.fromArray(1,2,3,4,5,6).take(3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"value : " + integer);
}
});
doOnNext
在回调观察者onNext方法前进行额外的处理
Observable.fromArray(1,2,3,4,5,6).take(3)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"doOnNext : " + integer);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"value : " + integer);
}
});
debounce
用于事件的过滤,其官方解释是数据的发射都延时指定时间后进行,并且在这段延时内新数据将会把老数据替换掉,在每次执行e.onNext(value)后计时器重置。简单说,就是在指定时间内,只接收时间段内的最新数据。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).debounce(10, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"value : " + integer);
}
});
throttleFirst
用于事件的过滤,在指定时间间隔内只保留第一个事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for(int i=0;i<10;i++){
e.onNext(i);
SystemClock.sleep(1500);
}
}
}).throttleFirst(2000, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"value : " + integer);
}
});
compose
与 flatMap 类似,都是进行变换,但是compose操作与返回的都是Observable对象,并且激活并发送事件。compose操作符拥有更高层次的抽象概念:它操作于整个数据流中,不仅仅是某一个被发送的事件。
1.compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
First
同样用于事件过滤,只发送所有事件中的第一个事件
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onNext("b");
e.onNext("c");
}
}).first(null).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG,"first value is "+s);
}
});
distinct & distinctUntilChanged
distinct( )过滤掉重复数据;distinctUntilChanged()过滤掉连续重复的数据。
Observable.just(1,2,1,1,3,4,5).distinct().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"distinct "+integer);
}
});
Observable.just(1,2,1,1,3,4,5).distinctUntilChanged().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG,"distinctUntilChanged "+integer);
}
});
总结
这篇文章带大家了解什么是RxJava,以及涉及到的相关概念。并且整理了RxJava的基本用法,以及常用的操作符的作用与使用方式。RxJava中的操作符非常多,这里只是列举了部分常用操作符讲解,更多的可以参考以下文档
RxJava Wiki:https://github.com/ReactiveX/RxJava/wiki
RxJava 中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/