总览
1 创建操作符
1.1 用途
用于创建 被观察者对象&发送事件
1.2 详细介绍
1.2.1 基本创建
用于完整创建1个被观察者对象
create()
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
//定义需要发送的事件序列
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//ObservableEmitter是事件发射器,做了两件事:定义发射事件 & 向观察者发送事件
// 注:建议发送事件前检查观察者的isUnsubscribed状态,以便在没有观察者时,让Observable停止发射数据
if (!observer.isUnsubscribed()) { // 或者 !emitter.isDisposed()
e.onNext("菜品1");
e.onNext("菜品2");
e.onNext("菜品3");
}
e.onComplete();//或者e.onError(exception);
}
});
1.2.2 快速创建
用于快速的创建被观察者对象
just()
可以直接发送传入的事件,但只能发送10个及以下事件
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1,2,3,4) // 创建完毕
fromArray()
- 可以直接发送传入的数组数据, 可发送10个以上事件(数组形式)
- 可遍历数组元素
// 数据准备
Integer[] data = { 0, 1, 2, 3, 4 };
// 在创建后就会将该数组转化成Observable并把数据取出逐个发送
Observable.fromArray(data)
fromIterable()
- 可以直接发送传入的集合List数据, 可发送10个以上事件(集合形式)
- 可遍历集合元素
List<Integer> list = List.of<>(1, 2, 3);
never() | empty() | error()
一般用于测试
<-- empty() -->
//仅发送Complete事件,直接通知完成,观察者接收后会直接调用onCompleted()
Observable observable1=Observable.empty();
<-- error() -->
// 仅发送Error事件,直接通知异常,观察者接收后会直接调用onError()
Observable observable2=Observable.error(new Exception())
<-- never() -->
// 被观察者不发送任何事件,观察者接收后什么都不调用
Observable observable3=Observable.never();
1.2.3 延迟创建
用于定时操作、周期性操作
defer()
只有当观察者开始订阅才创建Observable,为每个观察者创建新的 Observable,保证随时拿到Observable中最新的数据
//数据准备
List<String> list = new ArrayList<>();
list.add("菜品1");
//此时被观察者对象还没创建
Observable<String> observable1 = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.fromIterable(list);
}
});
// 第一次订阅,被观察者对象动态创建
observable1.subscribe(observer);
// 更新数据
list.add("菜品2");
list.add("菜品3");
// 再次订阅,接受到observable1最新的数据
observable1.subscribe(observer);
// 结果
observer 开始订阅subscribe
RxjavaText: 接收到Next事件:菜品1
RxjavaText: 接收到Complete事件
RxjavaText: observer 开始订阅subscribe
RxjavaText: 接收到Next事件:菜品1
RxjavaText: 接收到Next事件:菜品2
RxjavaText: 接收到Next事件:菜品3
RxjavaText: 接收到Complete事件
timer()
- 可快速创建一个被观察者对象,指定的延迟时间到来后,发送1个数值0(Long类型),即调用onNext (0)。一般用于检测
- timer操作符默认运行在一个新的线程上,可以自定义
//延迟5s,发送0
Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "接收到Next事件:" + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "接收到Error事件");
}
@Override
public void onComplete() {
Log.d(TAG, "接收到Complete事件");
}
});
// 结果
RxjavaText: 接收到Next事件:0
RxjavaText: 接收到Complete事件
interval()
- 能指定初次发送事件的延迟时间
- 能指定间隔时间,周期性发送整数序列
- 发送事件序列:从0开始,+1 的无限整数序列
//延迟5s发送第一个事件,间隔2秒就发送一个事件,从0开始递增1,无限个
Observable.interval(5, 2, TimeUnit.SECONDS)
intervalRange()
- 周期性发送整数序列
- 与interval()相比能指定发送数据的起始数、个数
// 起始数据、数据数量、发送第一个事件的延时时间、间隔时间
Observable.intervalRange(2, 3 , 2, 1, TimeUnit.SECONDS)
// 结果
2、3、4
range()
- 发送整数序列,与intervalRange()相比,不能指定初次延时时间和间隔时间
- 支持Integer类型
// 设为负数,抛出异常
// 起始数据、数据数量
Observable.range(2,3)
rangeLong()
- 与range()相同
- 支持Long类型
2 变换操作符
2.1 用途
对事件序列中的事件 / 整个事件序列 进行变换,使得其转变成不同的事件 / 整个事件序列
2.2 详细介绍
Map()
- 针对每个事件进行处理,变换成另一个事件
原理图
// 变换:n => n*n
Observable.just(1,2,3)
// 通过Function<原事件,变换后事件>函数对事件进行变换
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer data) throws Exception {
Log.d(TAG, "发射Next事件:" + data);
return data*data;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "observer 开始订阅subscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到Next事件:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "接收到Error事件");
}
@Override
public void onComplete() {
Log.d(TAG, "接收到Complete事件");
}
});
// 结果
RxjavaText: observer 开始订阅subscribe
D/RxjavaText: 发射Next事件:1
D/RxjavaText: 接收到Next事件:1
D/RxjavaText: 发射Next事件:2
D/RxjavaText: 接收到Next事件:4
D/RxjavaText: 发射Next事件:3
D/RxjavaText: 接收到Next事件:9
D/RxjavaText: 接收到Complete事件
FlatMap()
- 将事件序列拆分,单独变换单个事件,最后再无序合并成新的事件序列
- 为事件序列中每个事件都创建一个 Observable 对象
- 将 每个 原始事件 转换后的 新事件 都放入到对应的新的 Observable对象
- 将新建的每个Observable 无序合并到一个 新建的 Observable 对象
- 新建的 Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
原理图
ConcatMap()
- 与flatMap()类似,但是是有序合并,即最后合并的事件序列 = 原序列拆分后产生的事件序列
Buffer()
- 用来缓存发送事件。定期从Observable收集数据到一个集合 & 放到缓存区中,然后把这些数据集合打包发射,而不是一次发射一个
- 需要设置缓存区大小count(每次从被观察者获取的事件数量)和步长slip(每次新获取的事件数量)
- 每个缓存里面包含 count 个元素,每个缓存的开始元素索引(发送事件看成一个List集合)和skip取余都为0,即 index % skip == 0。 每隔 skip 个创建一个缓存。
- onNext()发送count倍数的事件,剩余的都在onComplete()中发送
(1) 当count > slip时,buffers中可能同时会有多个缓存区,这个时候缓存会有重叠的元素
Observable.just("菜品1","菜品2","菜品3","菜品4","菜品5","菜品6","菜品7").buffer(3,2)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "observer 开始订阅subscribe");
}
@Override
public void onNext(List<String> list) {
Log.d(TAG,"目前缓存里的事件个数:" + list.size());
for (int i = 0; i<list.size();i++){
Log.d(TAG, "接收到Next事件:" + list.get(i));
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "接收到Error事件");
}
@Override
public void onComplete() {
Log.d(TAG, "接收到Complete事件");
}
});
// 结果
RxjavaText: observer 开始订阅subscribe
RxjavaText: 目前缓存里的事件个数:3
RxjavaText: 接收到Next事件:菜品1
RxjavaText: 接收到Next事件:菜品2
RxjavaText: 接收到Next事件:菜品3
RxjavaText: 目前缓存里的事件个数:3
RxjavaText: 接收到Next事件:菜品3
RxjavaText: 接收到Next事件:菜品4
RxjavaText: 接收到Next事件:菜品5
RxjavaText: 目前缓存里的事件个数:3
RxjavaText: 接收到Next事件:菜品5
RxjavaText: 接收到Next事件:菜品6
RxjavaText: 接收到Next事件:菜品7
RxjavaText: 目前缓存里的事件个数:1
RxjavaText: 接收到Next事件:菜品7
RxjavaText: 接收到Complete事件
count > slip
(2) 当count < slip时,buffers在某些情况下没有缓存区,这个时候会有间隙,也就是说部分数据会丢失
Observable.just("菜品1","菜品2","菜品3","菜品4","菜品5","菜品6","菜品7").buffer(2,3)
// 结果
RxjavaText: observer 开始订阅subscribe
RxjavaText: 目前缓存里的事件个数:2
RxjavaText: 接收到Next事件:菜品1
RxjavaText: 接收到Next事件:菜品2
RxjavaText: 目前缓存里的事件个数:2
RxjavaText: 接收到Next事件:菜品4
RxjavaText: 接收到Next事件:菜品5
RxjavaText: 目前缓存里的事件个数:1
RxjavaText: 接收到Next事件:菜品7
RxjavaText: 接收到Complete事件
count < slip
3 组合/合并操作符
用途
组合多个被观察者 或者 合并多个待发送事件
详细介绍
3.1 组合多个被观察者
- 组合多个被观察者一起发送数据
concat() | concatArray()
- 按组合的顺序发送数据
- 区别:concat()可组合的被观察者<=4, concatArray() > 4
// 俺组合的顺序发送
Observable.concat(Observable.just(1,2), Observable.just(3,4), Observable.just(5,6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "observer 开始订阅subscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "接收到Next事件:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "接收到Error事件");
}
@Override
public void onComplete() {
Log.d(TAG, "接收到Complete事件");
}
});
// 结果
RxjavaText: observer 开始订阅subscribe
RxjavaText: 接收到Next事件:1
RxjavaText: 接收到Next事件:2
RxjavaText: 接收到Next事件:3
RxjavaText: 接收到Next事件:4
RxjavaText: 接收到Next事件:5
RxjavaText: 接收到Next事件:6
RxjavaText: 接收到Complete事件
merge() | mergeArray()
- 合并后按时间并行执行
// 发送顺序是 1和5(同时)、2和6、3和7
Observable.merge(
Observable.intervalRange(1,3,1,1,TimeUnit.SECONDS),
Observable.intervalRange(5,3,1,1,TimeUnit.SECONDS)
)
// 结果
RxjavaText: observer 开始订阅subscribe
D/RxjavaText: 接收到Next事件:1
D/RxjavaText: 接收到Next事件:5
D/RxjavaText: 接收到Next事件:2
D/RxjavaText: 接收到Next事件:6
D/RxjavaText: 接收到Next事件:3
D/RxjavaText: 接收到Next事件:7
D/RxjavaText: 接收到Complete事件
concatDelayError() | mergeDelayError()
- 在使用concat()和merge()操作符时,若其中一个被观察者发出Error事件,则立刻终止其他被观察者的事件发送,该操作符的作用就是用来解决其他被观察者不能发送事件的问题
Observable.concat(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NumberFormatException());
}
}), Observable.just(3, 4, 5))
// concat 结果
RxjavaText: observer 开始订阅subscribe
RxjavaText: 接收到Next事件:1
RxjavaText: 接收到Next事件:2
RxjavaText: 接收到Error事件
Observable.concatDelayError(Observable.fromArray(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NumberFormatException());
e.onNext(3);
}
}), observable.just(4, 5)))
// concatDelayError结果
// 值得注意的是发出Error事件本身的被观察者,其Error后的事件也不能发送
RxjavaText: observer 开始订阅subscribe
RxjavaText: 接收到Next事件:1
RxjavaText: 接收到Next事件:2
RxjavaText: 接收到Next事件:4
RxjavaText: 接收到Next事件:5
RxjavaText: 接收到Error事件
3.2 合并多个事件
zip()
- 合并多个Observable发送的事件,组合生成一个新的事件序列
- 合并方式:按照原事件顺序进行对位合并
- 合并后的事件数量 = 多个Observable中事件数量最少的数量
- 常用场景:需要组合从多个地方获取的信息,比如需要从多个接口获取数据
Observable<Integer> observable1 = Observable.just(1,2,3);
Observable<String> observable2 = Observable.just("A","B","C","D");
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
})
// 结果
1A
2B
3C
combineLatest()
- 将第一个发送事件的Observable的最后一个事件与另外一个Observable发送的每个事件合并
Observable.combineLatest(
Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long o1, Long o2) throws Exception {
// o1 = 第1个Observable发送的最新(最后)1个数据
// o2 = 第2个Observable发送的每1个数据
Log.e(TAG, "合并的数据是: "+ o1 + " "+ o2);
return o1 + o2;
}
})
// 结果
合并的数据: 3 0
结果是: 3
合并的数据是: 3 1
结果是: 4
合并的数据是: 3 2
结果是: 5
combineLatestDelayError()
- 作用类似于concatDelayError()
reduce()
- 把被观察者需要发送的事件聚合成1个事件发送
// 结果为1+2+3+4+5 = 15
Observable.just(1, 2, 3, 4, 5).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
collect()
- 把发送事件收集到一个数据结构中并发送
Observable.just(1, 2, 3 ,4, 5)
.collect(
//创建数据结构,用于收集被观察者发送的事件
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 对发送的数据进行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
// 参数说明:(数据结构,发送事件)
public void accept(ArrayList<Integer> list, Integer integer)
throws Exception {
list.add(integer);
}
})
});
// 结果
[1,2,3,4,5]
3.3 发送事件前追加发送事件
startWith()
startWithArray()
- 用来追加发送事件,越后追加越先发送
Observable.just(4, 5, 6)
.startWith(0) // 追加单个数据
.startWithArray(1, 2, 3) // 追加多个数据
// 结果
1、2、3、0、4、5、6
3.4 统计发送事件数量
count()
- 统计被观察者发送事件的数量
4 功能性操作符
辅助Observable在发送事件时实现的一些功能需求
4.1 订阅
subscribe()
- 连接观察者 & 被观察者
// 被观察者.subscribe(观察者)
Observable.just(1,2,3)
.subscribe(new Observer<>....)
4.2 线程调度
- 切换 Observable & Observer 到指定的工作线程
- 一般情况下在UI线程(主线程)中创建Observable & Observer ,那么生产事件 & 接受响应事件 都发生在UI线程中
subscribeOn()
- 切换 Observable 到指定的工作线程
- 可以有多个subscribeOn(),但只有第一个生效
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "Observable的工作线程是: " + Thread.currentThread().getName());
e.onNext("菜品1");
}
}).subscribeOn(Schedulers.newThread()) //开启一个新线程 ,第一个切换生效
.subscribeOn(Schedulers.io()) // io线程,不生效
.subscribeOn(AndroidSchedulers.mainThread()) // 主线程,不生效
// 结果
Observable的工作线程由 main 切换到 新线程RxNewThreadScheduler-1
observeOn()
- 切换 Observer 到指定的工作线程
- 可多次指定,且每次指定都有效
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("菜品1");
}
}).observeOn(Schedulers.newThread()) //切换到新的线程
.observeOn(AndroidSchedulers.mainThread()) //切换到主线程
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Observer的工作线程: " + Thread.currentThread().getName() + " 接收到Next事件:" + s);
}
});
// 结果
Observer的工作线程: main 接收到Next事件:菜品1
4.3 延迟操作
delay()
- Observable延时发送事件
Observable.just(1, 2, 3)
.delay(3, TimeUnit.SECONDS) // 延迟3s再发送
4.4 在时间生命周期中操作
do操作符
4.5 错误处理
onErrorReturn()
- 捕获error事件 & 发送特殊事件 & 正常终止
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("菜品1");
e.onNext("菜品2");
e.onError(new Exception("错误"));
}
}).onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
Log.d(TAG, "onErrorReturn 捕获Error:" + throwable.getMessage());
return "菜品3"; //发送新事件
}
})
// 结果
observer 开始订阅subscribe
接收到Next事件:菜品1
接收到Next事件:菜品2
onErrorReturn 捕获Error: 错误
接收到Next事件:菜品3
接收到Complete事件
onErrorResumeNext() | onExceptionResumeNext()
与onErrorReturn()使用方法相同
- onErrorResumeNext() 拦截 Throwable(如果拦截Exception,会传递错误给Observer的onError())
- onExceptionResumeNext() 拦截 Exception
retry()
- 当遇到Error事件,让Observable重新发射数据
retryUnit()
- 判断是否需要重发Error事件
retryWhen()
- 将Error传递给一个新的 Observable,并决定是否需要重新订阅原始被观察者(Observable)& 发送事件
4.6 重复发送操作
repeat()
- 无条件重复发送Observable的事件
repeatWhen()
- 有条件的重复发送Observable的事件
5 过滤操作符
用途
过滤Observable发送的事件 & Observer接受的事件
详细介绍
5.1 指定条件 过滤事件
Filter()
- 过滤特定条件的事件
Observable.just(1,2,3)
.filter(new Predicate<Integer>() {
// true,则继续发送; false,则不发送(即过滤)
@Override
public boolean test(Integer integer) throws Exception {
return integer < 2; // 过滤事件
}
}
ofType()
* 筛选出需要的类型事件
Observable.just(1, "111")
.ofType(Integer.class) // 筛选出 整型数据,过滤掉其他类型事件
skip() | skipLast()
- 跳过某个事件
Observable.just(1, 2, 3, 4, 5)
.skip(1) // 跳过正序的前1项
.skipLast(2) // 跳过正序的后2项
distinct() | distinctUnitChanged()
- 过滤事件序列中重复的事件 / 连续重复的事件
// 过滤掉1、2
Observable.just(1, 2, 3, 1 , 2 )
.distinct()
// 过滤掉3、4
Observable.just(1,2,3,1,2,3,3,4,4 )
.distinctUntilChanged()
5.2 指定事件数量 过滤事件
仅发送特定数量的事件
take()
- 指定Observer最多能接受的事件数量,按顺序筛选
takeLast()
- 指定Observer只能接受到的最后几个事件
5.3 指定时间 过滤事件
指定时间内发送事件
throttleFirst() | throttleLast()
- 发送时间内的第一次事件/最后一次事件
- 比如在某段时间内一直点击某个按钮,但只响应第一次操作
throttleFirst(1, TimeUnit.SECONDS) //筛选出每秒中第一个发送事件
sample()
- 发送某段时间内最后一次事件,与throttleLast()相同
throttleWithTimeout() | debounce()
- 发送指定时间周期内最后一次事件
...
e.onNext(1)
Thread.sleep(300)
e.onNext(2) // 相隔0.3s<1s,所以1被过滤
Thread.sleep(1200)
e.onNext(3) // 相隔1.2 s<1s,所以2被发送
...
throttleWithTimeout(1, TimeUnit.SECONDS) // 当前后两个发送事件间隔小于1(指定时间),则前者发送事被过滤掉
5.4 指定事件位置 过滤事件
发送指定的位置的事件
firstElement() | lastElement()
- 选取第一个/最后一个事件
elementAt()
- 能指定具体的位置,事件序列从0开始索引
elementAt(3) // 指定位置 < 序列最大长度,发送序列位置3的事件
elementAt(3, 9) // 指定位置 > 序列最大长度,发送默认事件9
elementAtOrError()
- 当指定位置 > 序列最大长度,抛出异常
6 条件/布尔操作符
用途
通过设置函数,判断Observable发送的事件是否符合条件
详细介绍
all()
- 用来判断所有发送事件是否满足某个条件,并发送判断结果(即,最后发送的事件是判断结果true or false)
sequenceEqual()
- 判定两个Observables需要发送的数据是否相同 & 发送结果事件
contains()
- 判断发送的事件中是否包含指定事件 & 发送结果事件
isEmpty()
- 判断发送所有事件是否为空
amb()
- 当有多个Observable发送事件,只发送最先发送的事件的Observable的事件
List<ObservableSource<Integer>> list= new ArrayList <>(); // 放置多个Observable
...
Observable.amb(list).subscribe(Observer)
takeWhile()
- 发送的数据满足设置的函数,则发送该项数据;否则不发送
takeUnit()
- 直到发送事件满足条件开始,停止发送事件
skipWhile()
- 跳过满足条件的事件,即直到判断结果为false,开始发送事件
skipUnit()
- 直到skipUnit(Observable)开始发送事件,外层的Observable才开始发送
// Observable1:每隔1s发送1个数据 = 从0开始,每次递增1
Observable.interval(1, TimeUnit.SECONDS)
// Observable2:延迟5s后开始发送1个Long型数据
.skipUntil(Observable.timer(5, TimeUnit.SECONDS))
defaultIfEmpty()
- 在只发送Complete事件的时候,可以发送一个默认值
defaultIfEmpty(10) //发送默认值10