简介
- 相关网站:GitHub地址、官方文档、中文文档。
- RxJava2 通过一种扩展的 观察者模式 来实现 异步 执行任务。
- RxJava2 能方便地进行 线程 切换。
- RxJava2 能方便地把 异步 执行的代码写在一处。
基础知识
相关概念
-
Observable: 被观察者 。 -
Observer: 观察者 。 -
Subscriber: 订阅者 ,实现了Observer接口,多了unsubscribe(),用来取消订阅。 -
Subscription:类似Subscriber,Observable调用subscribe()方法返回的对象。 -
Subject:可以当作Observable或Observer来用。 -
subscribe(): 订阅 方法。 -
Event:事件。 -
Scheduler: 调度器 ,相当于 线程控制器 。 -
Action0:接口,里面只有一个无返回值0参数的call(),同理有Action1、Action2等,代表着1个、2个参数等。 -
Func0:接口,类似Action0,区别是Func0有返回值。
相关方法
-
onNext():在 事件队列 中,进入下一个事件时调用,同时也是 事件处理方法 。 -
onCompleted(): 事件队列 完成时调用。 -
onError(): 事件队列 错误时调用。
简单示例
1. 普通示例
1. 定义Observable
Observable<Integer> observable = Observable.create(emitter -> {
// Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
// 发送消息
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 消息发送完毕
emitter.onComplete();
});
2. 定义Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
// 订阅时
// Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
// 接收到事件时
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
// 接收到错误时
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
// 事件接收完毕时
}
};
3. 订阅
注意 :此处代码的书写顺序看起来不符合直觉,这样设计是为了便于 链式调用 。
observable.subscribe(observer);
2. 链式示例
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
// Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
// 发送消息
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 消息发送完毕
emitter.onComplete();
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
// 订阅时
// Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
// 接收到事件时
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
// 接收到错误时
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
// 事件接收完毕时
}
});
3. 无Observer示例
subscribe()的所有重载形式
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
代码
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
emitter.onNext(4);
}).subscribe(
integer -> Log.d(TAG, "onNext: " + integer),
throwable -> Log.d(TAG, "onError"),
() -> Log.d(TAG, "onComplete"));
示例解说
-
emitter是ObservableEmitter类型,用于发射onNext(),onComplete(),onError()消息。 -
Disposable对象可以保存,日后通过调用Disposable.dispose()来 中断订阅 , 中断订阅 以后发送的消息无法被Observer接收到。 -
Observable可以不发送onComplete()或onError()。 - 当
Observable发送了一个onComplete()后,Observable在onComplete()之后的onNext()将会继续发送,而Observer收到onComplete()之后将不再继续接收onNext()。 - 当
Observable发送了一个onError()后,Observable在onError()之后的onNext()将继续发送,而Observer收到onError()之后将不再继续接收onNext()。 -
onComplete()和onError()必须 唯一 并且 互斥 ,即只能发送一个onComplete()或onError()。
事件流向图
| 事件流向 | 示意图 |
|---|---|
只发送onNext()事件 |
![]() |
发送onComplete()事件 |
![]() |
发送onError()事件 |
![]() |
线程相关
切换线程示例
-
subscribeOn():控制observable在什么 线程 中发送事件。 -
observeOn():控制observer在什么 线程 中处理事件。
// observable的链式操作中
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
线程种类
| 线程种类 | 描述 |
|---|---|
Schedulers.newThread() |
新 线程 。 |
Schedulers.computation() |
用于CPU计算密集型的操作的 线程 。 |
Schedulers.io() |
用于IO操作的 线程 ,如网络IO,文件IO,数据库IO。 |
AndroidSchedulers.mainThread() |
Android 的 UI主线程 。 |
注意事项
-
Observable和Observer默认是在 UI主线程 中运行的。 -
Observable多次切换线程的话,只有第一次有效。 -
Observer多次切换线程的话,只有最后一次生效。
过滤操作符
Sample
简介
-
Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。 - 会导致某些事件丢失。
示意图

变换操作符
Map
示意图

简介
用于将一种类型的 事件 转换为另一种类型的 事件 。
啰嗦示例
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, s);
}
});
简洁示例
Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
.map(integer -> "This is result " + integer)
.subscribe(s -> Log.d(TAG, s));
FlatMap
示意图

简介
-
FlatMap将单个 事件 或 事件队列 变换为一个发送单个 事件 或 事件队列 的Observable。 - 可以通过
FlatMap将 事件 变换为Observable来实现链式中连接多个Observable: Observable->事件->Observable->事件...->Observer 。 -
FlatMap并不保证 事件 的顺序,如果需要保证顺序则使用ConcatMap,使用方法同FlatMap。
啰嗦示例
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
}).flatMap(new Function<Integer, Observable<String>>() {
@Override
public Observable<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
简洁示例
Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
.flatMap(new Function<Integer, Observable<String>>() {
@Override
public Observable<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
})
.subscribe(s -> Log.d(TAG, s));
ConcatMap
-
ConcatMap保证 事件 的顺序,使用方法同FlatMap。 - 示例省略,参考
FlatMap。
Zip
示意图

简介
- 组合多个
Observable发送的 事件 ,然后发送这个 事件组合 。 - 事件组合 的顺序是严格按照 事件 发送的顺序来进行的。
- 发送 事件 的数量,与所有
Observable中 事件 数量最少的那个一样。 -
注意:这多个
Observable在同一个 线程 时无法将 事件 组合发送,即发送一个Observable的所有 事件 以后,再发送另一个Observable。 - 应用场景:一个界面需要展示用户的一些信息,而这些信息分别要从两个服务器接口中获取,而只有当两个都获取到了之后才能进行展示。
啰嗦示例
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "onNext: " + value);
}
});
简洁示例
Observable<Integer> observable1 = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create((ObservableOnSubscribe<String>) emitter -> {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, (integer, s) -> integer + s).subscribe(s -> Log.d(TAG, "onNext: " + s));
特殊情况处理
对于Observer在 UI主线程 进行 事件 处理,但是接收到 事件 时,所在的Activity或Fragment已经退出的情况,需要通过调用Disposable.dispose()或Subscription.cancel()来 中断订阅 。
1. 创建CompositeDisposable
-
CompositeDisposable对象是储存Disposable对象的容器。 - 作为
Activity或Fragment的成员:CompositeDisposable mCompositeDisposable = new CompositeDisposable();
2. 保存Disposable
在Observer.onSubscribe(...)中:
@Override
public void onSubscribe(Disposable d) {
mCompositeDisposable.add(d);
}
3. 中断订阅
在Activity或Fragment的onDestroy()中:
mCompositeDisposable.clear();
Flowable
简介
-
Flowable类似Observable,但是性能不如Observable。 -
Flowable为解决 Backpressure问题 而生。
Backpressure问题简介
- Backpressure问题 :当 事件 的发送远远快于 事件 的消耗时,未消耗的 事件 会堆积起来,最终可能发生 OOM 。
- 出现 Backpressure问题 可能是因为两个原因:
- 某个
Observable发送 事件 的速度太快,或数量太多。 - 某个
Observer处理 事件 的速度太慢。
- 某个
- 事件 的发送和接收在同一个 线程 的时候不会出现这种问题,因为必定会处理完一个 事件 以后才能继续发送下一个 事件 。
简洁示例
Flowable<Integer> upstream = Flowable.create(emitter -> {
// 直到下游开始请求事件
while (emitter.requested() == 0) {
if (emitter.isCancelled())
break;
}
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit complete");
emitter.onComplete();
}, BackpressureStrategy.ERROR);
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
示例解说
-
BackpressureStrategy( 背压策略 ),表示 事件 发送速度快于消耗速度时说使用的策略,有以下几种:-
MISSING:多出来的 事件 直接丢弃。 -
ERROR:抛出MissingBackpressureException异常。 -
BUFFER:全部 事件 缓存直到被消耗,注意 OOM 问题。也可以通过Flowable.onBackpressureBuffer()设置。 -
DROP:丢弃最近发送的 事件 。也可以通过Flowable.onBackpressureDrop()设置。 -
LATEST:只保留一个最近的 事件 覆盖前一个 事件 。也可以通过Flowable.onBackpressureLatest()设置。
-
-
Subscription.cancel():可以用于 取消订阅关系 。 -
Subscription.request(..):告知Flowable,Observer的 事件请求量 ,可以多次调用,每次调用累加 事件请求量 计数器。 -
FlowableEmitter.requested():得知Observer的 事件请求量 。


