一、scan语法
public Observable<Integer> getRxJavaCreateExampleData() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
});
}
public void rxJavaScanExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.scan( new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
LogUtils.error(TAG, "rxJavaScanExample--:" + Thread.currentThread().getName() + "-scan-:" + integer + "---" + integer2);
return integer+integer2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-19 15:16:15.864 12835-12880/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:1---2
08-19 15:16:15.865 12835-12880/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:3---3
08-19 15:16:15.867 12835-12835/com.example.zhang E/MainPresenter: rxJavaReduceExample--:main-consumer-:1
rxJavaReduceExample--:main-consumer-:3
rxJavaReduceExample--:main-consumer-:6
二、scan语法2
public void rxJavaScanExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.scan(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
LogUtils.error(TAG, "rxJavaScanExample--:" + Thread.currentThread().getName() + "-scan-:" + integer + "---" + integer2);
return integer+integer2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-19 15:14:12.876 12633-12692/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:10---1
08-19 15:14:12.877 12633-12692/com.example.zhang E/MainPresenter: rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:11---2
rxJavaScanExample--:RxCachedThreadScheduler-1-scan-:13---3
08-19 15:14:12.883 12633-12633/com.example.zhang E/MainPresenter: rxJavaReduceExample--:main-consumer-:10
rxJavaReduceExample--:main-consumer-:11
rxJavaReduceExample--:main-consumer-:13
rxJavaReduceExample--:main-consumer-:16
总结
1、scan(BiFunction<T, T, T> accumulator) 把数据叠加起来
2、 scan(final R initialValue, BiFunction<R, ? super T, R> accumulator) initialValue给将叠加的数据添加一个初始值
3、scan与reduce的区别:reduce是只返回一次结果,scan是多次
4、scan返回次数等于初始值一次+emitter发送数据size ,如果没有初始值,则emitter发送数据的第一个当初始值