Github地址:https://github.com/DingMouRen/RxJava2ExamplesDemo
Rxjava的四个概念:Observable 被观察者,Observer 观察者, subscribe 订阅,事件。
操作符
官网:http://reactivex.io/documentation/operators.html
1.create
作用:可用于获取一个被观察的对象
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
mTvLog.append("Observable 发射 1\n");
emitter.onNext(1);
mTvLog.append("Observable 发射 2\n");
emitter.onNext(2);
mTvLog.append("Observable 发射 3\n");
emitter.onNext(3);
mTvLog.append("Observable 发射 4\n");
emitter.onNext(4);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
mDisposable = disposable;
mTvLog.append("onSubscribe 获取到Disposable实例\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext -- "+integer+" Disposable的订阅状态:"+mDisposable.isDisposed()+"->"+(mDisposable.isDisposed()?"解除订阅":"订阅中")+"\n");
if (integer == 2){
mDisposable.dispose();
mTvLog.append("onNext -- "+integer+" Disposable的订阅状态:"+mDisposable.isDisposed()+"->"+(mDisposable.isDisposed()?"解除订阅":"订阅中")+"\n");
}
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError:"+e.getMessage());
}
@Override
public void onComplete() {
mTvLog.append("onComplete");
}
});
日志:
onSubscribe 获取到Disposable实例
Observable 发射 1
onNext -- 1 Disposable的订阅状态:false->订阅中
Observable 发射 2
onNext -- 2 Disposable的订阅状态:false->订阅中
onNext -- 2 Disposable的订阅状态:true->解除订阅
Observable 发射 3
Observable 发射 4
- 发射2后,调用mDisposable.dispose(),解除订阅关系,后面的观察者Observer就不会接受到onNext()了,但是观察则会Obserable会继续发送剩下的事件3,事件4,
- ObservableEmitter<T>专门用来发射数据。Disposable是用来解除订阅关系的,isDisposed() 返回为 false 的时候,接收器能正常接收事件,但当其为 true 的时候,接收器停止了接收。
2.just
作用:创建一个被观察对象,接受可变参数,并依次发射参数。
Observable.just(1,2,3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
mDisposable = disposable;
mTvLog.append("onSubscribe 获取到Disposable实例\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext -- "+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe 获取到Disposable实例
onNext -- 1
onNext -- 2
onNext -- 3
onComplete
3.map
作用:通过函数处理observable传递过来的数据(事件)
Observable.just(1,2,3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
mTvLog.append("map函数处理传递过来的数字\n");
//函数处理事件
String strResult = "数字变大2倍后:"+integer * 2;
return strResult;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
mTvLog.append("onSubscribe 获取到Disposable实例 线程:"+Thread.currentThread().getName()+"\n");
}
@Override
public void onNext(String s) {
mTvLog.append("onNext : "+s +" 线程:"+Thread.currentThread().getName()+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError 线程:"+Thread.currentThread().getName()+"\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete 线程:"+Thread.currentThread().getName()+"\n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe 获取到Disposable实例 线程:main
map函数处理传递过来的数字
map函数处理传递过来的数字
map函数处理传递过来的数字
onNext : 数字变大2倍后:2 线程:main
onNext : 数字变大2倍后:4 线程:main
onNext : 数字变大2倍后:6 线程:main
onComplete 线程:main
- map操作符依次处理observable传递过来的数据,处理完后再依次调用onNext()
4.zip
作用:用于合并事件,两两配对合并成 一个事件,最终配对出的 Observable 发射事件数目只和少的那个相同。
Observable.zip(getNumberObservable(), getLetterObservable(), new BiFunction<Integer ,String,String>() {
@Override
public String apply(Integer number, String letter) throws Exception {
mTvLog.append("zip number:"+number+" letter:"+letter +"\n");
return number + letter;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(String result) {
mTvLog.append("onNext result:"+result+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError "+e.getMessage()+"\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete \n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
//获取数字的Observable实例
private Observable getNumberObservable(){
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
mTvLog.append("NumberObservalbe 发射 1\n");
emitter.onNext(1);
mTvLog.append("NumberObservalbe 发射 2\n");
emitter.onNext(2);
mTvLog.append("NumberObservalbe 发射 3\n");
emitter.onNext(3);
emitter.onComplete();
}
});
}
//获取字母的Observable实例
private Observable getLetterObservable(){
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
mTvLog.append("NumberObservalbe 发射 A\n");
emitter.onNext("A");
mTvLog.append("NumberObservalbe 发射 B\n");
emitter.onNext("B");
mTvLog.append("NumberObservalbe 发射 C\n");
emitter.onNext("C");
mTvLog.append("NumberObservalbe 发射 D\n");
emitter.onNext("D");
emitter.onComplete();
}
});
}
日志:
onSubscribe
NumberObservalbe 发射 1
NumberObservalbe 发射 2
NumberObservalbe 发射 3
NumberObservalbe 发射 A
zip number:1 letter:A
onNext result:1A
NumberObservalbe 发射 B
zip number:2 letter:B
onNext result:2B
NumberObservalbe 发射 C
zip number:3 letter:C
onNext result:3C
onComplete
NumberObservalbe 发射 D
- 不能被配对的事件,会被舍弃掉
5.flatMap
作用:把一个发射器 Observable 通过某种方法转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable。flatMap 并不能保证事件的顺序,如果需要保证顺序,需要使用concatmap
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
mTvLog.append("Observable 发射 1 \n");
e.onNext(1);
mTvLog.append("Observable 发射 2 \n");
e.onNext(2);
mTvLog.append("Observable 发射 3 \n");
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
mTvLog.append("flatmap integer=" + integer + "\n");
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("value :" + integer + " - " + (i+1) );
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mTvLog.append("Consumer accept:" + s + "\n");
Log.e(mActivity.getClass().getSimpleName(), mTvLog.getText().toString());
}
});
日志:
Observable 发射 1
flatmap integer=1
Observable 发射 2
flatmap integer=2
Observable 发射 3
flatmap integer=3
Consumer accept:value :2 - 1
Consumer accept:value :2 - 2
Consumer accept:value :3 - 1
Consumer accept:value :1 - 1
Consumer accept:value :3 - 2
Consumer accept:value :3 - 3
Consumer accept:value :2 - 3
Consumer accept:value :1 - 2
Consumer accept:value :1 - 3
6.concatMap
作用:concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
mTvLog.append("Observable 发射 1 \n");
e.onNext(1);
mTvLog.append("Observable 发射 2 \n");
e.onNext(2);
mTvLog.append("Observable 发射 3 \n");
e.onNext(3);
e.onComplete();
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull final Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("value :" + integer + " - " + (i+1) );
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe \n" );
}
@Override
public void onNext(String s) {
mTvLog.append("onNext " + s + "\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError \n" );
}
@Override
public void onComplete() {
mTvLog.append("onComplete \n" );
Log.e(mActivity.getClass().getSimpleName(), mTvLog.getText().toString());
}
});
日志:
onSubscribe
Observable 发射 1
Observable 发射 2
Observable 发射 3
onNext value :1 - 1
onNext value :1 - 2
onNext value :1 - 3
onNext value :2 - 1
onNext value :2 - 2
onNext value :2 - 3
onNext value :3 - 1
onNext value :3 - 2
onNext value :3 - 3
onComplete
- concatMap操作符处理的时候,第一次是在main线程,后面的在计算的线程,这三次发射的线程一次是:main -> RxComputationThreadPool-1 -> RxComputationThreadPool-2
7.filter
作用:符合条件的事件才会被发射出去
Observable.just(1,2,3,4,5,6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
mTvLog.append("filter integer:"+integer+"\n");
return integer >3;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe \n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext integer:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError \n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete \n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
filter integer:1
filter integer:2
filter integer:3
filter integer:4
onNext integer:4
filter integer:5
onNext integer:5
filter integer:6
onNext integer:6
onComplete
8.take
作用:接受一个 long 型参数 count ,代表至多接收 count 个数据
mTvLog.append("发射 1 ,2,3,4,5\n");
Observable.fromArray(1,2,3,4,5)
.take(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubcribe \n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext integer:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
Observable 发射 1 ,2,3,4,5
onSubcribe
onNext integer:1
onNext integer:2
onComplete
9.doOnNext
作用:让订阅者在接收到数据之前做一些别的事情
mTvLog.append("Observable 发射 1\n");
Observable.just(1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
mTvLog.append("doOnNext 无返回值 在此保存数据 线程:"+Thread.currentThread().getName()+"\n");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext integer:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
Observable 发射 1
onSubscribe
doOnNext 无返回值 在此保存数据 线程:main
onNext integer:1
onComplete
10.timer
作用:指定一个延时任务
Observable.timer(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,所以需要切换回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
mTvLog.append(new Date().toString()+" 结束任务\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
Wed Dec 19 21:05:38 GMT+08:00 2018 开始执行2延时任务
Wed Dec 19 21:05:40 GMT+08:00 2018 结束任务
11.interval
作用:间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位,默认在新线程。例如倒计时
mTvLog.append("开始\n");
mDisposable = Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
mTvLog.append(new Date().toString() + " num="+aLong+"\n");
Log.e(mActivity.getClass().getSimpleName(),new Date().toString() + " num="+aLong);
}
});
Single
作用:Single只会接收一个参数,而SingleObserver只会调用onError或者onSuccess.只发射一条单一的数据,或者一条异常通知。要么成功,要么失败。
Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
mTvLog.append("Single 发射 1\n");
emitter.onSuccess(1);//异常通知与成功通知之间只能发射一个
}
}).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe \n");
}
@Override
public void onSuccess(Integer integer) {
mTvLog.append("onSuccess integer:"+integer+"\n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError "+e.getMessage()+"\n");
}
});
日志:
onSubscribe
Single 发射 1
onSuccess integer:1
12.Completable
作用:只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
mTvLog.append("Completable 发射 1\n");
emitter.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe \n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete \n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError \n");
}
});
日志:
onSubscribe
Completable 发射 1
onComplete
13.MayBe
作用:可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。
Maybe.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> emitter) throws Exception {
mTvLog.append("MayBe 发射 1\n");
emitter.onSuccess(1);
emitter.onComplete();
}
}).subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscirbe\n");
}
@Override
public void onSuccess(Integer integer) {
mTvLog.append("onSuccess integer:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
日志:
onSubscribe
Completable 发射 1
onComplete
14.skip
作用:接受一个long型参数,代表跳过多少个数目的事件再开始接收
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
mTvLog.append("Observable 发射 1\n");
emitter.onNext(1);
mTvLog.append("Observable 发射 2\n");
emitter.onNext(2);
mTvLog.append("Observable 发射 3\n");
emitter.onNext(3);
emitter.onComplete();
}
}).skip(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext integer:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
Observable 发射 1
Observable 发射 2
Observable 发射 3
onNext integer:3
onComplete
15.concat
作用:把两个发射器连接成一个发射器,有顺序的发射
Observable.concat(Observable.just(1,2),Observable.just("3"))
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe \n");
}
@Override
public void onNext(Object object) {
String result = "";
if (object instanceof Integer){
result = (int) object + "";
}else if (object instanceof String){
result = (String) object;
}
mTvLog.append("onNext object:"+result+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError \n");
}
@Override
public void onComplete() {
mTvLog.append("onCompelte\n");
Log.e(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
onNext object:1
onNext object:2
onNext object:3
onCompelte
16.distinct
作用:去重操作符
Observable.just(1,1,2,2,3)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext integer:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
onNext integer:1
onNext integer:2
onNext integer:3
onComplete
17.buffer
作用:将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。
Observable.just(1,2,3,4,5,6,7,8)
.buffer(3,2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(List<Integer> integers) {
mTvLog.append("onNext 集合:"+ Arrays.toString(integers.toArray())+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError \n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete \n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
onNext 集合:[1, 2, 3]
onNext 集合:[3, 4, 5]
onNext 集合:[5, 6, 7]
onNext 集合:[7, 8]
onComplete
18.debounce
作用:去除发送频率过快的项
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
mTvLog.append("Observable 发射1 睡490毫秒\n");
Thread.sleep(490);
emitter.onNext(2);
mTvLog.append("Observable 发射2 睡500毫秒\n");
Thread.sleep(500);
emitter.onNext(3);
mTvLog.append("Observalbe 发射3 睡510毫秒\n");
Thread.sleep(510);
emitter.onNext(4);
mTvLog.append("Observalbe 发射3 睡520毫秒\n");
Thread.sleep(520);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext 接收到:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
Observable 发射1 睡490毫秒
Observable 发射2 睡500毫秒
onNext 接收到:2
Observalbe 发射3 睡510毫秒
onNext 接收到:3
Observalbe 发射3 睡520毫秒
onNext 接收到:4
onComplete
19.defer
作用:defer 操作符与create、just、from等操作符一样,是创建类操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。在某些情况下,等到最后一刻(即直到订阅时间)生成Observable可以确保此Observable包含最新的数据.
Observable<Integer> observableDefer = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
mTvLog.append("defer\n");
return Observable.just(1);
}
});
Observer<Integer> observerDefer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Integer value) {
mTvLog.append("onNext 接收到:"+value+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
};
observableDefer.subscribe(observerDefer);
日志:
defer
onSubscribe
onNext 接收到:1
onComplete
20.last
作用:接收发射的最后一个值,参数是没有值的时候的默认值
Observable.just(1,2,3,4,5,6).last(0)
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onSuccess(Integer integer) {
mTvLog.append("onSuccess 成功接收:"+integer+"\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
});
日志:
onSubscribe
onSuccess 成功接收:6
21.merge
作用:把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送,两个发射器不再同一线程。
@Override
protected void test() {
mTvLog.append("\n\n");
Observable.merge(getObservable_1(), getObservable_2())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
mainThreadTextLog("onNext = " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
/**
* 第一个Observable
*
* @return
*/
private Observable<String> getObservable_1() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Thread.sleep(500);
mainThreadTextLog("Observable_1发射 a");
emitter.onNext("a");
Thread.sleep(500);
mainThreadTextLog("Observable_1发射 b");
emitter.onNext("b");
Thread.sleep(500);
mainThreadTextLog("Observable_1发射 c");
emitter.onNext("c");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
}
/**
* 第二个Observable
*
* @return
*/
private Observable<String> getObservable_2() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Thread.sleep(300);
mainThreadTextLog("Observable_2发射 A");
emitter.onNext("A");
Thread.sleep(300);
mainThreadTextLog("Observable_2发射 B");
emitter.onNext("B");
Thread.sleep(300);
mainThreadTextLog("Observable_2发射 C");
emitter.onNext("C");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
}
日志:
Observable_2发射 A
onNext = A
Observable_1发射 a
onNext = a
Observable_2发射 B
onNext = B
Observable_2发射 C
onNext = C
Observable_1发射 b
onNext = b
22.reduce
作用:把一个被观察者中的多个事件进行压缩,最后发射压缩后的事件。聚合或压缩。
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer sum, Integer integer) throws Exception {
mTvLog.append("reduce sum:"+sum+" integer:"+integer+"\n");
return sum+ integer;
}
})
.subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onSuccess(Integer integer) {
mTvLog.append("onSuccess 接收到:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
reduce sum:1 integer:2
reduce sum:3 integer:3
reduce sum:6 integer:4
onSuccess 接收到:10
23.scan
作用:对发射的数据进行处理,发送每次的处理结果. reduce是对发射的数据进行处理,返回最终的处理结果。
Observable.just(1,2,3,4)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer sum, Integer integer) throws Exception {
mTvLog.append("sacn sum:"+sum+" integer:"+integer+"\n");
return sum+ integer;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("onNext 接收到:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
onSubscribe
onNext 接收到:1
scan sum:1 integer:2
onNext 接收到:3
scan sum:3 integer:3
onNext 接收到:6
scan sum:6 integer:4
onNext 接收到:10
onComplete
24.window
作用:有点类似buffer,window是把数据分割成了Observable,buffer是把数据分割成List
mTvLog.append("Observable 发射 1,2,3,4,5\n");
Observable.just(1,2,3,4,5)
.window(3,2)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("onSubscribe\n");
}
@Override
public void onNext(Observable<Integer> integerObservable) {
mTvLog.append("onNext "+integerObservable.toString()+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("onError\n");
}
@Override
public void onComplete() {
mTvLog.append("onComplete\n");
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
}
});
日志:
Observable 发射 1,2,3,4,5
onSubscribe
onNext io.reactivex.subjects.UnicastSubject@4a78b884
onNext io.reactivex.subjects.UnicastSubject@4a7b00fc
onNext io.reactivex.subjects.UnicastSubject@4a79b990
onComplete
25.PublishSubject
作用:一旦订阅了,将所有随后观察到的项发送给订阅方。
PublishSubject<Integer> publishSubject = PublishSubject.create();
//绑定第一个订阅者
publishSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("第1个订阅者 onSubscribe\n");
}
@Override
public void onNext(Integer value) {
mTvLog.append("第1个订阅者 onNext 接收到:"+value+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("第1个订阅者 onError\n");
}
@Override
public void onComplete() {
mTvLog.append("第1个订阅者 onComplete\n");
}
});
//发射数据
publishSubject.onNext(1);
publishSubject.onNext(2);
mTvLog.append(" - - - - \n");
//绑定第二个订阅者
publishSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("第2个订阅者 onSubscribe\n");
}
@Override
public void onNext(Integer value) {
mTvLog.append("第2个订阅者 onNext 接收到:"+value+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("第2个订阅者 onError\n");
}
@Override
public void onComplete() {
mTvLog.append("第2个订阅者 onComplete\n");
}
});
//发射数据
publishSubject.onNext(3);
publishSubject.onNext(4);
publishSubject.onComplete();
日志:
第1个订阅者 onSubscribe
第1个订阅者 onNext 接收到:1
第1个订阅者 onNext 接收到:2
- - - -
第2个订阅者 onSubscribe
第1个订阅者 onNext 接收到:3
第2个订阅者 onNext 接收到:3
第1个订阅者 onNext 接收到:4
第2个订阅者 onNext 接收到:4
第1个订阅者 onComplete
第2个订阅者 onComplete
26.AsyncSubject
作用:在调用 onComplete() 之前,除了 subscribe() 其它的操作都会被缓存,在调用 onComplete() 之后只有最后一个 onNext() 会生效。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
//第一个订阅者
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("第1个订阅者 onSubscirbe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("第1个订阅者 onNext value:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("第1个订阅者 onError\n");
}
@Override
public void onComplete() {
mTvLog.append("第1个订阅者 onComplete\n");
}
});
asyncSubject.onNext(1);
asyncSubject.onNext(2);
asyncSubject.onComplete();
mTvLog.append(" - - - \n");
//第2个订阅者
asyncSubject.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
mTvLog.append("第2个订阅者 onSubscirbe\n");
}
@Override
public void onNext(Integer integer) {
mTvLog.append("第2个订阅者 onNext value:"+integer+"\n");
}
@Override
public void onError(Throwable e) {
mTvLog.append("第2个订阅者 onError\n");
}
@Override
public void onComplete() {
mTvLog.append("第2个订阅者 onComplete\n");
}
});
asyncSubject.onNext(3);
asyncSubject.onNext(4);
asyncSubject.onComplete();
日志:
第1个订阅者 onSubscirbe
第1个订阅者 onNext value:2
第1个订阅者 onComplete
- - -
第2个订阅者 onSubscirbe
第2个订阅者 onNext value:2
第2个订阅者 onComplete
- asyncSubject调用了onComplete后,后面的操作就不再生效。
27.BehaviorSubject
作用:发射Observable最近的数据,如果Observable还没有开始发射数据,则发射BehaviorSubject的默认数据
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext(1);
behaviorSubject.onNext(2);
behaviorSubject.onNext(3);
behaviorSubject.onNext(4);
behaviorSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
mTvLog.append("accept 接收到value:"+integer+"\n");
}
});
behaviorSubject.onNext(5);
behaviorSubject.onNext(6);
Log.i(mActivity.getClass().getSimpleName(),mTvLog.getText().toString());
日志:
accept 接收到value:4
accept 接收到value:5
accept 接收到value:6
- 发射的是订阅前发射的最后一个事件,加上订阅之后发射的所有事件
28.Flowable
作用:支持背压。背压指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
- 背压:当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。
/**
* 展示背压
*/
private void showBackpressure(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while (true){
i++;
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(5000);
Log.e("accept","接收到 " + integer);
}
});
}
上面的例子代码,由于上游通过Observable发射数据的速度大于下游通过Consumer接收处理数据的速度,而且上下游分别运行在不同的线程中,下游对数据的接收处理不会堵塞上游对数据的发射,造成上游数据积压,内存不断增加,最后便会导致内存溢出。
Subscription 用于响应式拉取。来设置下游对数据的请求数量,上游可以根据下游的需求量,按需发送数据。如果request设置下游的需求量为零,上游Flowable发射的数据不会交给下游Subscriber处理。
背压策略
- BackpressureStrategy.MISSING:在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符
- BackpressureStrategy.ERROR: 在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常
- BackpressureStrategy.BUFFER:此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM
- BackpressureStrategy.DROP:在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据
- BackpressureStrategy.LATEST:Subscriber在接收完成之前,接收的数据是Flowable发射的最后一条数据
背压正确处理姿势
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
if (e.requested() == 0) continue;//此处添加代码,让flowable按需发送数据
Log.e("Flowable","发射 --->"+i);
i++;
e.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
s.request(1); //设置初始请求数据量为1
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(50);
Log.e("Flowable","接收到 --->"+integer);
mSubscription.request(1);//每接收到一条数据增加一条请求量
} catch (InterruptedException ignore) {
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});