RxJava 博大精深,想要入门和进阶,操作符是一个切入点。 所以,我们希望寻找一种可以把操作符写得比较爽,同时可以快速验证输入输出是否准确的玩法。思路有以下两点:
- 使用 UT(JUnit Test) 来对每一个操作符进行实现,如此一来可以脱离 Android 平台的依赖,专注于操作符本身。
- 对于每一种操作符,使用 RX Marbles ,或者 RxJava 官方的弹珠图(marble diagrams)进行实现。
比如下面两张图,分别来自 RX Marbles 和官方的弹珠图,我们要做的就是用 UT 有目的性、精确地实现这两张图的输入和输出。
所谓有目的性、精确地输入输出,意思就是根据所有操作符的弹珠图的每条数据流,以及操作符的含义,严格按照图片表达的意思进行代码的实现。通过这种方强迫症一般的方式,对理解操作符和 RxJava 的体系有很大的帮助。
(一)预备知识
我们希望把精力专注于操作符的实现,而不是单元测试的技巧,但由于 RxJava 的异步特性,有很多操作符是跟线程相关的,因此我们要先掌握单元测试中如何对线程进行处理的预备知识。
让测试线程最晚结束
在线程相关的测试代码中,有个很棘手的现象是:测试线程早于子线程执行完毕,如下代码:
@Test
public void test_thread_early() {
//测试线程启动
System.out.println("测试线程-start");
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子线程-start");
OperatorUtils.sleep(3000);
System.out.println("子线程-end");
}
}).start();
//测试线程结束后,子线程还未执行完毕,因此子线程无法完整的输出测试结果
System.out.println("测试线程-end");
}
在上述代码中,测试线程瞬间就执行完毕了,而子线程需要执行3s,测试线程早于子线程执行完毕,因此子线程将无法完整的执行,因此,输出的结果是:
测试线程-start
测试线程-end
子线程-start
于此对应的,我们来看看 RxJava 操作符的例子,通过 timer
操作符实现延迟3s发送数据:
@Test
public void test_thread_early_observable() {
System.out.println("测试线程-start,所在线程:" + Thread.currentThread().getName());
//消息源在Schedulers.computation()线程中执行,3s后执行,此时测试线程已经执行完毕,无法正常输出结果
Observable.timer(3, TimeUnit.SECONDS)
.subscribe(num -> {
System.out.println("Observable和Subscriber所在线程:" + Thread.currentThread().getName());
System.out.println("获取订阅数据:" + num);
});
System.out.println("测试线程--end");
}
与上面的代码一样,由于测试线程早早的结束了,timer
操作符所在的线程 Schedulers.computation()
将无法完整地执行完毕,因此输出的结果是:
测试线程-start,所在线程:main
测试线程-end
如果无法保证所有线程都执行完毕,便无法得到预期的输出结果。那么,如何解决这个问题?有种最笨的方法便是让测试线程成为最晚结束的线程,我们为测试线程增加类似于 Thread.sleep(4000)
的逻辑,便可保证以上两份代码可以在正常输出。(此文不希望涉及太多的测试技巧,如果需要更严谨和更强大的线程异步测试,可以参考些第三方框架,如 awaitility)
使用TestScheduler操纵时间
除了这种笨方法之外,RxJava 提供了 TestScheduler
,通过这个调度器可以实现对时间的操纵。
对于上文提到的 timer
操作符,通过testScheduler.advanceTimeBy(3, TimeUnit.SECONDS)
可以将时间提前3s,此时测试线程和 timer
操作符所在的线程均可顺利的执行完毕,完整代码如下:
@Test
public void test_thread_with_TestScheduler() {
TestScheduler testScheduler = Schedulers.test();
System.out.println("测试线程:" + Thread.currentThread().getName());
//指定调度器
Observable.timer(3, TimeUnit.SECONDS, testScheduler)
.subscribe(num -> {
System.out.println("Observable和Subscriber线程:" + Thread.currentThread().getName());
System.out.println("获取订阅数据:" + num);
});
//将时间提前了3s
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
}
聚合操作符的线程处理
很多聚合操作符,如 merge
、zip
等,需要在多个不同的线程中构造不同的数据流,从而体现数据流发送的先后关系,以及所对应的不同的输出结果。如何让多个线程完整的执行完毕?结合上文所讲的让测试线程最晚结束以及**使用 TestScheduler
**便可做到。笔者在下文的聚合操作符一节中将会具体讲解。
有了这些预备知识,基本上可以实现 RxJava 的所有操作符,接下来针对不同类型的操作符分别举例一二进行讲解,如需完整代码,请前往Github:
https://github.com/geniusmart/RxJavaOperatorsUTSample
(二)不同类型的操作符实现
interval
interval
作为创建型的操作符,具备间隔一段时间发送数据的能力,是我们写其他操作符的基础,因此先来讲解下interval
。
这张图要表达的意思很简单,自顶而下的分析如下:
- 操作符:由于
interval
操作符是创建型的,因此直接调用操作符来产生数据流,根据 api 参数,需定义其间隔时长,这个数值我们设置为100ms。 - 输入:执行了
Observable.interval()
之后,每间隔指定时间将输出0、1、2、3……
的无穷数据(注:通过弹珠图,可以看到第一个数据也是有间隔时间的)。 - 输出:即数据消费者,在 RxJava 中体现为
Subscriber
。这张图里并没有画出输出的数据流,为了观察输出,我们自定义订阅者。 - 实现思路:
interval
默认在Schedulers.computation()
线程中执行,执行的时间将会超过测试线程,根据上文的「预备知识」这一节所述,我们使用TestScheduler
来操纵时间,比如,为了输出4个数据,interval
需要4个单位的间隔时间(400ms),将时间提前400ms可输出我们想要的结果。具体实现如下:
@Test
public void interval() {
Observable.interval(100, TimeUnit.MILLISECONDS, mTestScheduler)
.subscribe(mList::add);
//时间提早400ms前
mTestScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L));
//时间提早(400 + 200)ms前
mTestScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
assertEquals(mList, Arrays.asList(0L, 1L, 2L, 3L, 4L, 5L));
}
以此类推,range
、just
、repeat
等创建型的操作符均可按照这种方式实现弹珠图,这类操作符的实现代码请查看:CreatingOperatorsTest.java
delay
delay
是工具类型的操作符,可以对数据流进行延时发送。
与创建型操作符 interval
弹珠图不一样,delay
有输入和输出两条数据流,中间是操作符的转换过程。输入需借助创建型操作符实现(如 just
),输出则由订阅者完成。
- 输入:使用比较简单的
just
操作符,即Observable.just(1, 2, 1)
。 - 输出:经过
delay
的变换,在延迟指定的时间之后,输出与输入一致的输入流。 - 实现思路:此操作符也是与时间相关的操作符,通用
TestScheduler
来操纵时间,并且验证「延时时间内」和「超过延时时间」是否有数据流输出。代码如下:
@Test
public void delay() {
Observable.just(1, 2, 1)
.delay(3000, TimeUnit.SECONDS, mTestScheduler)
.subscribe(mList::add);
mTestScheduler.advanceTimeBy(2000, TimeUnit.SECONDS);
System.out.println("after 2000ms,result = " + mList);
assertTrue(mList.isEmpty());
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
System.out.println("after 3000ms,result = " + mList);
assertEquals(mList, Arrays.asList(1, 2, 1));
}
工具型的操作符还有非常多,比如变换线程的 observeOn
和 subscribeOn
,比如 Observable
生命周期的事件监听操作符 doOnSubscribe
、doOnNext
、doOnCompleted
,延迟订阅的 delaySubscription
等,这类型的操作符实现请查看:UtilityOperatorsTest.java。
amb
amb
是条件型的操作符(Conditional Operators),满足一定的条件数据流才会开始发送,而 amb
需要满足的条件便是:多个数据流中最早产生数据的数据流进行发送,弹珠图也明确地表达出了这层含义。
- 输入:这里有3条数据流,开始发送数据的时间各不一样,通过之前的操作符讲解,这里使用
just
+delay
即可实现。 - 输出:经过
amb
变化后,输出了最早发送数据的数据流,即第二条数据流。 - 实现思路:通过
delay
操作符分别延时500s、200s和1000s,然后通用TestScheduler
将时间提早1000s,订阅数据流后,验证下输出。代码如下:
@Test
public void amb() {
Observable<Integer> o1 = Observable.just(20, 40, 60)
.delay(500, TimeUnit.SECONDS, mTestScheduler);
Observable<Integer> o2 = Observable.just(1, 2, 3)
.delay(200, TimeUnit.SECONDS, mTestScheduler);
Observable<Integer> o3 = Observable.just(0, 0, 0)
.delay(1000, TimeUnit.SECONDS, mTestScheduler);
Observable.amb(o1, o2, o3)
.subscribe(mList::add);
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList(1, 2, 3));
}
以此类推,更多条件行的操作符,如 skipUntil
、takeUntil
等,请查看ConditionalAndBooleanOperatorsTest.java。
buffer
buffer
是转换型的操作符,他可以将单个数据缓存起来,批量发送,发送的数据类型是 List
。
上图要表达的意思很明确,发送6个数据,每三个做一次缓存,然后批量发送,代码实现如下:
@Test
public void buffer() {
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3)
.subscribe(mList::add);
System.out.println(mList);
List<List<Integer>> exceptList = Arrays.asList(Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6));
assertEquals(mList, exceptList);
}
flatMap 和 concatMap
接下来,来对比一组转换型的操作符:flatMap
和 concatMap
,这两者充分体现了 marble diagrams 给我们带来的各种有价值的信息。以下是这两个操作符的 marble diagrams:
- 输入:两者完全一模一样的输入,这里要重点关注弹珠的颜色,颜色代表了数据流的顺序。
- 输出:输入的数据流经过变换后,每份数据都变成了两份,此外,**
flatMap
变换后,绿色的◇和蓝色◇是交叉的,而concatMap
则保持了与输入一致的顺序**,这个细节决定了我们如何来实现这两张图。 - 实现思路:在
flatMap
或concatMap
之后,3个数据变成了6个数据,假设输入为1、2、3
,则输出为1、1、2、2、3、3
,我们要想办法让变换后的输出有时间差,即按照1、1、2、3、2、3
的顺序输出,思考再三,interval
可以实现这个场景,将原始的输入流1、2、3分别作为interval
时间间隔的变量,来模拟交叉的输出。具体实现如下:
@Test
public void flatMap() {
Observable.just(1, 2, 3)
.flatMap((Func1<Integer, Observable<?>>) num -> Observable.interval(num - 1,
TimeUnit.SECONDS, mTestScheduler)
.take(2)
.map(value -> num + "◇"))
.subscribe(mList::add);
mTestScheduler.advanceTimeBy(100, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList("1◇", "1◇", "2◇", "3◇", "2◇", "3◇"));
System.out.println(mList);
}
上述代码中,只需把 flatMap
修改为 concatMap
,便可获得 "1◇", "1◇", "2◇", "2◇", "3◇", "3◇"
的数据流,与弹珠图所要表达的意思完全一致。通过这个例子,我们可以感受到,弹珠图包含了操作符的诸多细节,严谨地实现弹珠图的输入输出,可以更深入的了解操作符。
更多转换型的操作符的实现,如 switchMap
、groupBy
、window
等,请查看TransformingOperatorsTest.java。
debounce
debounce
是过滤型的操作符,所以会按一定的规则过滤数据流。这个规则是:Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。
- 输入:对于输入的数据流可以这样定义:先产生
1
的数据,间隔500ms后产生2、3、4、5
,再间隔500ms,产生6
,使用create
操作符结合Thread.sleep()
来实现输入。 - 输出:
debounce
的间隔时间设置为400ms,在三段间隔周期内,将依次输出 1、5、6 。具体代码如下:
@Test
public void debounce() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
OperatorUtils.sleep(500);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onNext(5);
OperatorUtils.sleep(500);
subscriber.onNext(6);
subscriber.onCompleted();
}
})
.subscribeOn(mTestScheduler)
.doOnNext(System.out::println)
.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(mList::add);
// 测试线程将时间提早10ms,可以保证create操作符顺利执行完毕
mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
System.out.println(mList);
assertEquals(mList, Arrays.asList(1, 5, 6));
}
以此类推,按照这种方式可以实现 sample
、throttleFirst
、throttleLast
等过滤型的操作符,具体代码请查看:FilteringOperatorsTest.java 。
merge
merge
是聚合型的操作符。既然是聚合,因此需要2条以上的数据流,聚合之后,输出一条全新的数据流。
- 输入:两条数据流,并且要重点关注下数据发送的顺序。
- 输出:根据输入的数据顺序,原封不动的合并之后,进行输出。
- 实现思路:两条数据流均使用
interval
创建,第一条的间隔时间定义为5s,第二条数据流在第一条数据流产生了三个数据之后才发出第一个数据,因此时间间隔设置为18s,具体实现如下:
@Test
public void merge() {
Observable<Long> observable1 = Observable.interval(5, TimeUnit.SECONDS, mTestScheduler)
.take(5)
.map(aLong -> (aLong + 1) * 20)
.doOnNext(System.out::println);
Observable<Long> observable2 = Observable.interval(18, TimeUnit.SECONDS, mTestScheduler)
.take(2)
.map(aLong -> 1L)
.doOnNext(System.out::println);
Observable.merge(observable1, observable2).subscribe(mList::add);
mTestScheduler.advanceTimeBy(1000, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList(20L, 40L, 60L, 1L, 80L, 100L, 1L));
}
combineLatest
combineLatest
是聚合型的操作符, 其聚合的规则是:每条数据流中的每个数据都要与另外一条数据流已发送的最近的数据进行两两结合。
构造出如弹珠图所示的两条数据流,重点在于制造时间差和多线程:
- 使用
create
+Thread.sleep()
来制造数据流产生的时间差。 - 让两条数据流在不同的线程中发送数据,使用
subscribeOn
操作符可以实现线程的调度。
- 首先是第一条数据流的构造,让其在
TestScheduler.test()
线程中产生数据(其实便是测试线程,增加了操纵时间的能力),代码如下:
Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
OperatorUtils.logThread("observable1");
subscriber.onNext(1);
OperatorUtils.sleep(500);
subscriber.onNext(2);
OperatorUtils.sleep(1500);
subscriber.onNext(3);
OperatorUtils.sleep(250);
subscriber.onNext(4);
OperatorUtils.sleep(500);
subscriber.onNext(5);
subscriber.onCompleted();
}
}).subscribeOn(mTestScheduler).doOnNext(System.out::println);
- 其次是第二条数据流,将其生产数据的线程定义为
Schedulers.newThread()
,代码如下:
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
OperatorUtils.logThread("observable2");
OperatorUtils.sleep(250);
subscriber.onNext("A");
OperatorUtils.sleep(300);
subscriber.onNext("B");
OperatorUtils.sleep(500);
subscriber.onNext("C");
OperatorUtils.sleep(100);
subscriber.onNext("D");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread()).doOnNext(System.out::println);
- 前面2点完成了输入,接下来就是进行聚合变换,以及消费数据,产生输出,并验证与弹珠图的输出一致。
(Func2<Integer, String, Object>) (integer, s) -> integer + s).subscribe(mList::add);
//测试线程提前一定时间,让observable1能顺利开始发送数据
mTestScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
System.out.println(mList);
assertEquals(mList, Arrays.asList("1A", "2A", "2B", "2C", "2D", "3D", "4D", "5D"));
与 merge
和 combineLatest
类似,我们可以依次实现 zip
、switchOnNext
、 withLatestFrom
等聚合型操作符,并了解他们之间的区别。聚合型的操作符所有代码请前往:CombiningOperatorsTest.java 。
connect
之前介绍的创建型操作符均创建了 cold 类型的 Observable
,其特点是只有订阅者订阅数据时,数据流才会开始发送数据。于此相反,hot 类型的 Observable
不管有没有订阅者,都可以直接开始发送数据。publish
和 connect
是与 hot Observable
相关的一类操作符。
这张弹珠图并不好理解,但如果能完整实现,对 hot Observable
的便能了然于胸。这张图中,输出有三条数据流,代表有三个订阅者,但是订阅的时间不一致,最终接收到的数据也不一致,此外,这张图中,体现了 publish
和 connect
两种操作符。
- 输入:数据流的产生比较清晰,用上文讲过的创建型操作符即可实现。由于需要时间差,因此采用
interval
来产生数据流,时间间隔定义为3s。此外,interval
产生的数据流是 cold 类型的,如何由 cold 变成 hot,其实这便是publish
操作符要做的事情。 - 输出:输出的信息量比较大,我们需要好好捋一捋:
- 首先可以明确有三个订阅者,且订阅的时间各不一样。延时订阅可以使用
delaySubscription
操作符。 - 第一个订阅者即刻订阅,不延时,而他在订阅时,数据流还未开始发送数据,因此可以订阅到完整的数据流。
- 第一个订阅者的数据流中有个操作符不可忽视——
connect
,他决定着Observable
何时开始发送数据。根据图中所示,将时间定义为2秒后。 - 第二个订阅者在数据发送了2个之后才开始订阅,因此将订阅时间设置为延迟6秒订阅。他将只能订阅到最后一个数据。
- 第三个订阅者与第一个区别并不大,我们将他定义为延时1秒后订阅。
完整的代码实现如下:
public void connect() {
List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
List<Integer> list3 = new ArrayList<>();
//构造1,2,3的数据流,每隔3s发射数据
ConnectableObservable<Integer> connectableObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
OperatorUtils.sleep(3000);
subscriber.onNext(2);
OperatorUtils.sleep(3000);
subscriber.onNext(3);
}
}).publish();
System.out.println("Subscriber1-0s后开始订阅数据");
//立刻订阅完整的数据流
connectableObservable.doOnNext(num -> System.out.println("Subscriber1-->" + num))
.subscribe(list1::add);
//延迟6s后再订阅,将只订阅到3的数据流
connectableObservable.delaySubscription(6, TimeUnit.SECONDS, Schedulers.newThread())
.doOnSubscribe(()->{
System.out.println("Subscriber2-6s后开始订阅数据");
})
.doOnNext(num -> System.out.println("Subscriber2-->" + num))
.subscribe(list2::add);
//延迟1s后再订阅,将只订阅到完整的数据流
connectableObservable.delaySubscription(1, TimeUnit.SECONDS, Schedulers.newThread())
.doOnSubscribe(()->{
System.out.println("Subscriber3-1s后开始订阅数据");
})
.doOnNext(num -> System.out.println("Subscriber3-->" + num))
.subscribe(list3::add);
//延时2s执行connect()
OperatorUtils.sleep(2000);
System.out.println("Observable 2s后触发connect()");
connectableObservable.connect();
assertEquals(list1, Arrays.asList(1, 2, 3));
assertEquals(list2, Collections.singletonList(3));
assertEquals(list3, Arrays.asList(1, 2, 3));
}
以此类推,可以实现其他与 hot Observable
相关的操作符,如 refCount、replay、cache 等,具体代码请查看ConnectableOperatorsTest.java 。
其他类型的操作符
除了上文介绍的7种不同类型的操作符之外,还有错误处理类型(如 retry
、retryWhen
)、背压类型(如 onBackpressureBuffer
)、Convert 类型(如toList
、toMap
)的操作符未涉及到,以及一些弹珠图无法完全诠释操作符本身的诸多细节的讲解,篇幅所限,请移步这篇文章查看。
(三)本文代码
本文的所有代码请前往这个地址查看:
https://github.com/geniusmart/RxJavaOperatorsUTSample
目前已经实现了的弹珠图(marble diagrams)的操作符种类如下:
(四)结束语
授人以鱼不如授人以渔。本文侧重介绍一种学习 RxJava 、全面且深入了解操作符的方式,总结起来有如下关键点:
- 使用单元测试实现,消除对 Android 的依赖,且不要涉及太多的测试技巧,专注于操作符的实现。
- 有目的性且严谨地实现输入输出。每个操作符,读懂 marble diagrams ,并通过代码实现。
- marble diagrams 图片来自于RX Marbles ,或者 RxJava 官方 。
- 一些有更深层次含义或细节的,marble diagrams 无法完整诠释的,如
defer
,retryWhen
,查阅更多的文章实现。这部分的讲解请移步到另外一篇文章:《使用 UT 玩转 defer 和 retryWhen》。