(1)concat和concatArray
组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1,2));
list.add(Observable.just(3, 4));
list.add(Observable.just(5, 6));
Observable.concat(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
Observable.concatArray(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
以上两种方式的合并,返回结果是:1 2 3 4 5 6
(2)merge和mergeArray
组合多个被观察者一起发送数据,合并后 按时间线并行执行
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS));
list.add(Observable.just(3, 4));
list.add(Observable.just(5, 6));
list.add(Observable.just(7, 8));
Observable
.merge(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
Observable
.mergeArray(Observable.just(1, 2).delay(2000, TimeUnit.MILLISECONDS), Observable.just(3, 4), Observable.just(5, 6), Observable.just(7, 8))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
以上两种方法的返回结果是:3 4 5 6 7 8 1 2
(3)concatDelayError和mergeDelayError
concatDelayError:多个Observable合并,并按顺序发射数据, 如果发生异常,则不会立即中断发射数据,异常将延迟发射。
mergeDelayError:多个Observable合并,并行发射数据, 如果发生异常,则不会立即中断发射数据,异常将延迟发射。
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}).delay(2000, TimeUnit.MILLISECONDS));
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(3);
e.onNext(4);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}));
list.add(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(5);
e.onNext(6);
e.onError(new NullPointerException("exception"));
e.onComplete();
}
}));
Observable
.concatDelayError(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("aaa", "发生了异常");
}
});
返回结果是:
(4)zip
合并多个被观察者的数据流, 然后发送(Emit)最终合并的数据。(数据和数据之间是一对一的关系)
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
SystemClock.sleep(1000);
e.onNext(2);
SystemClock.sleep(1000);
e.onNext(3);
SystemClock.sleep(1000);
e.onNext(4);
SystemClock.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
SystemClock.sleep(1000);
e.onNext("B");
SystemClock.sleep(1000);
e.onNext("C");
SystemClock.sleep(1000);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer,String,String>() {
@Override
public String apply(Integer a,String b) throws Exception {
return a+b;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", s);
}
});
返回结果:
(5)combineLatest
按照同一时间线来进行合并。
Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
SystemClock.sleep(700);
e.onNext(1);
SystemClock.sleep(700);
e.onNext(2);
SystemClock.sleep(700);
e.onNext(3);
SystemClock.sleep(700);
e.onNext(4);
SystemClock.sleep(700);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable observable2=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
SystemClock.sleep(600);
e.onNext("B");
SystemClock.sleep(600);
e.onNext("C");
SystemClock.sleep(600);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.combineLatest(observable1, observable2, new BiFunction<Integer,String,String>() {
@Override
public String apply(Integer a,String b) throws Exception {
return a+b;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("aaa", s);
}
});
接下来,根据代码,我们来画一张图。
画两个时间线observable1和observable2,根据代码中指定的时间画时间线,最后观察两个被观察者时间线重合的地方。
实际上代码输出的结果也是:
(6)combineLatestDelayError
作用类似于concatDelayError() / mergeDelayError() ,即错误处理,上面已经介绍过类似的了。
(7)reduce
把被观察者需要发送的事件聚合成1个事件 & 发送
Observable.just(1,2,3,4,5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
日志如下:
相当于做了一个这样的计算1+2+3+4+5 = 15,再将15发射出去。
(8)collect
将被观察者Observable发送的数据事件收集到一个数据结构里
Observable
.just(1, 2, 3, 4, 5, 6)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
list.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> list) throws Exception {
for (int result : list){
Log.d("aaa", String.valueOf(result));
}
}
});
执行结果:
(9)startWith和startWithArray
startWith: 在已有数据流之前追加一个或一组数据流。
startWith可以传递的参数是:一个数据,一个数据列表,一个Observable。
Observable.just(1, 2, 3)
.startWith(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回结果: 4 1 2 3
startWithArray:在已有数据流之前追加一组数据流。
Observable.just(1, 2, 3)
.startWithArray(4, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("aaa", String.valueOf(integer));
}
});
返回结果:4 5 1 2 3
(10)count
统计被观察者发送事件的数量
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("aaa", String.valueOf(aLong));
}
});
返回结果:4