一、创建操作符
1、create
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello Observer");
e.onComplete();
}
});
说明:创建一个被观察者Observable
2、just
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
打印:
=================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete
说明:创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
3、fromArray
Integer[] array = new Integer[]{1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
打印:
=================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete
说明:这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。
4、fromCallable
Observable.fromCallable(new Callable < Integer > () {
@Override
public Integer call() throws Exception {
return 1;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "================accept " + integer);
}
});
================accept 1
说明:这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。
5、fromFuture
FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
@Override
public String call() throws Exception {
Log.d(TAG, "CallableDemo is Running");
return "返回结果";
}
});
Observable.fromFuture(futureTask)
.doOnSubscribe(new Consumer < Disposable > () {
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();
}
})
.subscribe(new Consumer < String > () {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "================accept " + s);
}
});
打印:
CallableDemo is Running
================accept 返回结果
说明:参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。
doOnSubscribe() 的作用就是只有订阅时才会发送事件。
6、fromIterable
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=================onComplete ");
}
});
打印:
=================onSubscribe
=================onNext 0
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete
说明:直接发送一个 List 集合数据给观察者
8、defer
// i 要定义为成员变量
Integer i = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
i = 200;
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
i = 300;
observable.subscribe(observer);
打印:
================onNext 200
================onNext 300
说明:因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。
9、timer
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===============onSubscribe " + d);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "===============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
2019-08-19 13:30:09.819 26417-26417/com.xbox D/SDK: ===============onSubscribe null
2019-08-19 13:30:14.820 26417-26446/com.xbox D/SDK: ===============onNext 0
说明:timer的第一个参数是延时5秒,第二个参数是单位。当执行完onSubscribe方法后延时5秒执行onNext方法,返回的参数为0
10、interval
Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
==============onSubscribe
==============onNext 0
==============onNext 1
==============onNext 2
==============onNext 3
.............
.............
说明:第一个参数为每隔多少秒执行onNext方法(从0开始),当执行完onSubscribe后,每隔2秒执行onNext方法
11、intervalRange
Observable.intervalRange(4, 5, 8, 3, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
==============onSubscribe
==============onNext 4
==============onNext 5
==============onNext 6
==============onNext 7
==============onNext 8
说明:
第一个参数:从4开始计数
第二个参数:一共执行onNext5次(五个值)
第三个参数:当执行完onSubscribe方法之后,隔8秒开始执行第一个onNext方法
第四个参数:执行完第一个onNext之后,每隔3秒执行下一个onNext方法
依次+1递增
12、range
Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Integer aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
==============onSubscribe
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6
说明:从2开始执行5次onNext方法,+1递增
13、rangeLong
说明:与 range() 一样,只是数据类型为 Long 这里就不上代码了
14、 empty() & never() & error()
Observable.empty()
.subscribe(new Observer < Object > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "==================onNext");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete");
}
});
说明:
empty():直接发送 onComplete() 事件
never():不发送任何事件
error():发送 onError() 事件
二、转换操作符
1、map
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
@Override
public String apply(Integer integer) throws Exception {
return "I'm " + integer;
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "===================onSubscribe");
}
@Override
public void onNext(String s) {
Log.e(TAG, "===================onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
===================onSubscribe
===================onNext I'm 1
===================onNext I'm 2
===================onNext I'm 3
说明:map 可以将被观察者发送的数据类型转变成其他的类型,上述代码的作用是将 Integer 类型的数据转换成 String。
2、flatMap
List<Person> personList = new ArrayList<>();
List<Plan> plansList = new ArrayList<>();
List<String> actionList = new ArrayList<>();
actionList.add("玩游戏");
actionList.add("写作业");
actionList.add("看书");
Plan plan = new Plan("1", "小明的计划");
plan.setActionList(actionList);
plansList.add(plan);
Person person = new Person("小明", plansList);
personList.add(person);
List<String> actionList2 = new ArrayList<>();
List<Plan> plansList2 = new ArrayList<>();
actionList2.add("开电脑");
actionList2.add("打王者");
actionList2.add("吃鸡");
Plan plan2 = new Plan("2", "小红的计划");
plan2.setActionList(actionList2);
plansList2.add(plan2);
Person person2 = new Person("小红", plansList2);
personList.add(person2);
Observable.fromIterable(personList)
.flatMap(new Function<Person, ObservableSource<Plan>>() {
@Override
public ObservableSource<Plan> apply(Person person) {
return Observable.fromIterable(person.getPlanList());
}
})
.flatMap(new Function<Plan, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Plan plan) throws Exception {
return Observable.fromIterable(plan.getActionList());
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "==================action: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
public class Person {
private String name;
private List<Plan> planList;
public Person(String name, List<Plan> planList) {
this.name = name;
this.planList = planList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Plan> getPlanList() {
return planList;
}
public void setPlanList(List<Plan> planList) {
this.planList = planList;
}
}
public class Plan {
private String time;
private String content;
private List<String> actionList = new ArrayList<>();
public Plan(String time, String content) {
this.time = time;
this.content = content;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public List<String> getActionList() {
return actionList;
}
public void setActionList(List<String> actionList) {
this.actionList = actionList;
}
}
打印:
==================action: 玩游戏
==================action: 写作业
==================action: 看书
==================action: 开电脑
==================action: 打王者
==================action: 吃鸡
说明:通过flatMap打印出所有的action
3、concatMap
说明:与flatMap用法基本一样,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的
4、buffer
Observable.just(1, 2, 3, 4, 5,6,7)
.buffer(3, 2)
.subscribe(new Observer < List < Integer >> () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List < Integer > integers) {
Log.d(TAG, "================缓冲区大小: " + integers.size());
for (Integer i: integers) {
Log.d(TAG, "================元素: " + i);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
================缓冲区大小: 3
================元素: 1
================元素: 2
================元素: 3
================缓冲区大小: 3
================元素: 3
================元素: 4
================元素: 5
================缓冲区大小: 3
================元素: 5
================元素: 6
================元素: 7
================缓冲区大小: 1
================元素: 7
说明:buffer从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。
第一个参数代表缓冲区元素的数量,第二个参数表示下一次事件序列的时候要跳过多少元素,比如例子中第二个参数是2,那么在次遍历就从3开始,跳过了1和2。
5、groupBy
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 3;
}
})
.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "====================onSubscribe ");
}
@Override
public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
Log.d(TAG, "====================onNext ");
integerIntegerGroupedObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "====================GroupedObservable onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "====================GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "====================GroupedObservable onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "====================GroupedObservable onComplete ");
}
});
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "====================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "====================onComplete ");
}
});
打印:
====================onSubscribe
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext groupName: 1 value: 1
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext groupName: 2 value: 2
====================onNext
====================GroupedObservable onSubscribe
====================GroupedObservable onNext groupName: 0 value: 3
====================GroupedObservable onNext groupName: 1 value: 4
====================GroupedObservable onNext groupName: 2 value: 5
====================GroupedObservable onNext groupName: 0 value: 6
====================GroupedObservable onNext groupName: 1 value: 7
====================GroupedObservable onNext groupName: 2 value: 8
====================GroupedObservable onNext groupName: 0 value: 9
====================GroupedObservable onNext groupName: 1 value: 10
====================GroupedObservable onComplete
====================GroupedObservable onComplete
====================onComplete
说明:在 groupBy() 方法返回的参数是分组的名字(integerIntegerGroupedObservable.getKey()),每返回一个值,那就代表会创建一个组,以上的代码就是将1~10的数据分成3组。
6、scan
Observable.just(1, 2, 3)
.scan(new BiFunction < Integer, Integer, Integer > () {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "====================apply ");
Log.d(TAG, "====================integer " + integer);
Log.d(TAG, "====================integer2 " + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "====================accept " + integer);
}
});
打印:
====================accept 1
====================apply
====================integer 1
====================integer2 2
====================accept 3
====================apply
====================integer 3
====================integer2 3
====================accept 6
说明:scan将数据以一定的逻辑聚合起来,相当于从第一个元素开始(1)开始 元素1和元素2相加和与元素3相加
7、window
Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=====================onSubscribe ");
}
@Override
public void onNext(Observable < Integer > integerObservable) {
integerObservable.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=====================integerObservable onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=====================integerObservable onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=====================integerObservable onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=====================integerObservable onComplete ");
}
});
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=====================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=====================onComplete ");
}
});
打印:
=====================onSubscribe
=====================integerObservable onSubscribe
=====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 5
=====================integerObservable onComplete
=====================onComplete
说明:window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。例中,window() 将 1~5 的事件分成了3组。
三、组合操作符
1、concat
Observable.concat(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
说明:可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
2、concatArray
Observable.concatArray(Observable.just(1, 2),
Observable.just(3, 4),
Observable.just(5, 6),
Observable.just(7, 8),
Observable.just(9, 10))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
================onNext 1
================onNext 2
================onNext 3
================onNext 4
================onNext 5
================onNext 6
================onNext 7
================onNext 8
================onNext 9
================onNext 10
说明:与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。
3、merge
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
@Override
public String apply(Long aLong) throws Exception {
return "A" + aLong;
}
}),
Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
@Override
public String apply(Long aLong) throws Exception {
return "B" + aLong;
}
}))
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "=====================onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印:
=====================onNext A0
=====================onNext B0
=====================onNext A1
=====================onNext B1
=====================onNext B2
=====================onNext A2
=====================onNext B3
=====================onNext A3
=====================onNext A4
=====================onNext B4
=====================onNext A5
=====================onNext B5
..............
说明:这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。
把merge换成concat之后打印如下:
=====================onNext A0
=====================onNext A1
=====================onNext A2
=====================onNext A3
=====================onNext A4
..............
说明:只有等到第一个被观察者发送完事件之后,第二个被观察者才会发送事件。mergeArray() 与 merge() 的作用是一样的,只是它可以发送4个以上的被观察者,这里就不再赘述了。
4、concatArrayDelayError() & mergeArrayDelayError()
Observable.concatArray(
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
}
});
打印:
===================onNext 1
===================onError
说明:在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError();
从结果可以知道,确实中断了,现在换用 concatArrayDelayError(),代码如下:
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onError(new NumberFormatException());
}
}), Observable.just(2, 3, 4))
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
}
});
打印:
===================onNext 1
===================onNext 2
===================onNext 3
===================onNext 4
===================onError
说明:出现错误延迟回调
5、zip
Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
Log.d(TAG, "===================A 发送的事件 " + s1);
return s1;
}}),
Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
Log.d(TAG, "===================B 发送的事件 " + s2);
return s2;
}
}),
new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
return res;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "===================onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印:
===================onSubscribe
===================A 发送的事件 A1
===================B 发送的事件 B1
===================onNext A1B1
===================A 发送的事件 A2
===================B 发送的事件 B2
===================onNext A2B2
===================A 发送的事件 A3
===================B 发送的事件 B3
===================onNext A3B3
===================A 发送的事件 A4
===================B 发送的事件 B4
===================onNext A4B4
===================A 发送的事件 A5
===================B 发送的事件 B5
===================onNext A5B5
===================onComplete
说明:会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样,上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。
6、combineLatest() & combineLatestDelayError()
Observable.combineLatest(
Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
.map(new Function < Long, String > () {@Override
public String apply(Long aLong) throws Exception {
String s1 = "A" + aLong;
Log.d(TAG, "===================A 发送的事件 " + s1);
return s1;
}
}),
Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
.map(new Function < Long, String > () {@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
Log.d(TAG, "===================B 发送的事件 " + s2);
return s2;
}
}),
new BiFunction < String, String, String > () {@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
return res;
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "===================最终接收到的事件 " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "===================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
打印:
===================onSubscribe
===================A 发送的事件 A1
===================A 发送的事件 A2
===================B 发送的事件 B1
===================最终接收到的事件 A2B1
===================A 发送的事件 A3
===================最终接收到的事件 A3B1
===================A 发送的事件 A4
===================B 发送的事件 B2
===================最终接收到的事件 A4B1
===================最终接收到的事件 A4B2
===================B 发送的事件 B3
===================最终接收到的事件 A4B3
===================B 发送的事件 B4
===================最终接收到的事件 A4B4
===================B 发送的事件 B5
===================最终接收到的事件 A4B5
===================onComplete
说明:combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔2秒发送一次事件。当发送 A1 事件之后,因为 B 并没有发送任何事件,所以根本不会发生结合。当 B 发送了 B1 事件之后,就会与 A 最近发送的事件 A2 结合成 A2B1,这样只有后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。
因为 combineLatestDelayError() 就是多了延迟发送 onError() 功能,这里就不再赘述了。
7、reduce
Observable.just(3, 7, 10, 2)
.reduce(new BiFunction < Integer, Integer, Integer > () {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
int res = integer + integer2;
Log.d(TAG, "====================integer " + integer);
Log.d(TAG, "====================integer2 " + integer2);
Log.d(TAG, "====================res " + res);
return res;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "==================accept " + integer);
}
});
打印:
====================integer 3
====================integer2 7
====================res 10
====================integer 10
====================integer2 10
====================res 20
====================integer 20
====================integer2 2
====================res 22
==================accept 22
说明:与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。从结果可以看到,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止,把最终的结果通过观察者一次输出。
8、collect
Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
@Override
public ArrayList < Integer > call() throws Exception {
return new ArrayList < > ();
}
},
new BiConsumer < ArrayList < Integer > , Integer > () {
@Override
public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer < ArrayList < Integer >> () {
@Override
public void accept(ArrayList < Integer > integers) throws Exception {
Log.d(TAG, "===============accept " + integers);
}
});
打印:
===============accept [1, 2, 3, 4]
说明:
将数据收集到数据结构当中。
9、startWith() & startWithArray()
Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "================accept " + integer);
}
});
打印:
================accept 1
================accept 2
================accept 3
================accept 4
================accept 5
================accept 6
================accept 7
说明:在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
10、count
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "=======================aLong " + aLong);
}
});
打印:
=======================aLong 3
说明:返回被观察者发送事件的数量。