1. RxJava2 : 什么是观察者模式
2. RxJava2 : 创建操作符(无关时间)
3. Rxjava2 : 创建操作符(有关时间)
4. Rxjava2 : 变换操作符
5. Rxjava2 : 判断操作符
6. Rxjava2 : 筛选操作符
7. Rxjava2 : 合并操作符
8. Rxjava2 : do操作符
9. Rxjava2 : error处理
10. Rxjava2 : 重试
11. Rxjava2 : 线程切换
api | use |
---|---|
filter | {{filter}} |
ofType | {{ofType}} |
elementAt | {{elementAt}} |
firstElement & lastElement | {{firstElement}} |
distinct | {{distinct}} |
distinctUntilChanged | {{distinctUntilChanged}} |
take / takeLast | {{take}} |
skip / skipLast | {{skip}} |
throttleFirst / throttleLast | {{throttleFirst}} |
throttleWithTimeout / debounce | {{throttleWithTimeout}} |
filter
- 筛选
Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer != 4;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:1
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:2
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:3
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:5
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:6
02-11 17:46:46.206 17513-17513/... D/SplashActivity: onComplete
ofType
- 筛选出指定类型
Observable.just(1, "one", 2, "tow", 3, "three")
.ofType(Integer.class)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:1
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:2
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:3
02-11 17:54:15.436 19570-19570/... D/SplashActivity: onComplete
elementAt
- 选择元素
1.获取指定位置的元素
2.准许越界,如果越界,支持指定默认值
//这是越界的情况
Disposable subscribe = Observable.just(1, 2, 3, 4)
.elementAt(5,10)
.subscribe(integer ->
Log.d(TAG, "integer:" + integer));
log
02-12 09:37:34.093 13153-13153/... D/SplashActivity: integer:10
firstElement / lastElement
- firstElement
仅取第一个 - lastElement
仅取最后一个
Disposable subscribe = Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "integer:" + integer);
}
});
log
02-12 09:28:26.753 10462-10462/... D/SplashActivity: integer:1
distinct
- 过滤重复的,只要出现过得就不会再出现
Observable.just(1, 2,3,1,3)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:1
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:2
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:3
02-11 18:00:08.836 20056-20056/... D/SplashActivity: onComplete
distinctUntilChanged
- 过滤重复的,但过滤掉的必须是连续重复的
Observable.just(1, 2, 3, 1, 3, 3, 4, 4)
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:2
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:4
02-11 18:02:59.656 20334-20334/... D/SplashActivity: onComplete
take / takeLast
- take
取指定数目的,如果小于指定数目,则会进入 error(从头取) - takeLast
取指定数目的,如果小于指定数目,则会进入 error(从尾取)
Observable.just(1, 2, 3, 4, 5, 6)
.take(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:1
02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:2
02-11 18:35:23.196 21766-21766/... D/SplashActivity: onComplete
skip
- skip
正序跳过指定 count,如果不足,则会进入 onError - skipLast
倒序跳过指定 count,如果不足,则会进入 onError
Observable.just(1, 2, 3, 4).skip(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:3
02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:4
02-12 09:19:47.243 9691-9691/... D/SplashActivity: onComplete
throttleFirst / throttleLast
- throttleFirst
一段时间间隔的第一个(接到第一个之后开始计时,当超过指定时间之后去接下一个) - throttleLast
一段时间间隔的最后一个
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(1600);
e.onNext(3);
Thread.sleep(2100);
e.onNext(4);
e.onComplete();
}
}).throttleFirst(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
log
02-12 10:01:16.243 14629-14629/... D/SplashActivity: integer:1
02-12 10:01:18.343 14629-14629/... D/SplashActivity: integer:3
02-12 10:01:20.443 14629-14629/... D/SplashActivity: integer:4
02-12 10:01:20.443 14629-14629/... D/SplashActivity: complete
throttleWithTimeout / debounce
- throttleWithTimeout
每次都是两个数据一同判断,如果第二次的接收到的数据和第一次接收到的数据时间间隔小于指定间隔,则会丢弃第一次数据,之后,第二次接收的数据成为第一次数据,而第三次数据变为第二次数据,以此类推,决定是否接收 - debounce
同
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(500);
e.onNext(3);
Thread.sleep(500);
e.onNext(4);
Thread.sleep(500);
e.onNext(5);
Thread.sleep(500);
e.onNext(6);
e.onComplete();
}
}).throttleWithTimeout(1, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
log
02-12 10:15:56.573 16357-16357/... D/SplashActivity: integer:6
02-12 10:15:56.573 16357-16357/... D/SplashActivity: onComplete