响应式编程
RxJava提供了响应式编码规范,而RxAndroid是专供Android平台的RxJava(只是针对平台增加了少量类),一般Android开发者口中的RxJava指的便是RxAndroid。关于响应式编程的概念这里不多说,网上一大堆,我就简单说明下响应式编程与传统编程的区别。如果逻辑B依赖于逻辑A,逻辑C依赖于逻辑B,那么:
- 传统编程中的作法是在A执行完成后去执行B,B执行完毕后执行C,除了A、B、C本身的逻辑外,这部分由依赖产生的先后执行逻辑也是由开发者编写。
- 响应式编程中,会先将A、B、C做个依赖绑定,A->B->C,就像水流一样,从A流到B,再从B流到C,依赖产生的先后执行逻辑由语言或者库提供支持,开发者只需要编写A、B、C本身的逻辑以及告诉提供方三者间的关系即可。换句话说,就是B会自动响应A的执行结果,C会自动响应B的执行结果。
响应式编程可以很优雅地处理任务间(或者业务间)的依赖关系,而这关系就像流一样,并且多用于异步场景,且由于大家熟知的响应式编程库RxJava的主要应用场景也是异步,主要应用手段是流,因此不少文章简单粗暴地将响应式编程等同是异步+流,这个是比较片面的。
示例
我们来看个具体的例子,加深下理解。本例中需要处理三个任务,分别为A、B、C,这三个任务都需要在工作线程中执行,A执行完以后延迟1S执行B,B执行完以后延迟1S执行C。
以下为公共代码,定义了A、B、C三个任务以及定时器:
public class TaskA implements Runnable {
@Override
public void run() {
// do something
}
}
public class TaskB implements Runnable {
@Override
public void run() {
// do something
}
}
public class TaskC implements Runnable {
@Override
public void run() {
// do something
}
}
public class Timer {
private static final Timer INSTANCE = new Timer();
private Handler mHandler;
private Timer() {
HandlerThread thread = new HandlerThread("timer");
thread.start();
mHandler = new Handler(thread.getLooper());
}
public static Timer get() {
return INSTANCE;
}
public void post(Runnable runnable) {
mHandler.post(runnable);
}
public void postDelayed(Runnable runnable, long delay) {
mHandler.postDelayed(runnable, delay);
}
}
传统编程
Timer.get().post(new Runnable() {
@Override
public void run() {
new TaskA().run();
Timer.get().postDelayed(new Runnable() {
@Override
public void run() {
new TaskB().run();
Timer.get().postDelayed(new Runnable() {
@Override
public void run() {
new TaskC().run();
}
}, 1000);
}
}, 1000);
}
});
传统编程方式的问题非常突出,嵌入层次太多,随着依赖关系的增多以及复杂化,这样的代码会变得极其臃肿且不易阅读和维护。
响应式编程
首先我们需要编写一个支持响应式编程规范的库,代码如下(这个类很多情况没有考虑到,仅仅是为了演示用):
public class StreamTimer implements Runnable {
private List<Task> mTasks = new LinkedList<>();
public StreamTimer() {
}
public StreamTimer next(Runnable runnable) {
return next(runnable, 0);
}
public StreamTimer next(Runnable runnable, long delay) {
Task task = new Task(runnable, delay);
mTasks.add(task);
return this;
}
public void startup() {
startNextTimer();
}
private void startNextTimer() {
if(mTasks.isEmpty()) {
return;
}
Task task = mTasks.get(0);
Timer.get().postDelayed(this, task.delay);
}
@Override
public void run() {
Task task = mTasks.remove(0);
task.runnable.run();
startNextTimer();
}
private class Task {
Runnable runnable;
long delay;
Task(Runnable runnable, long delay) {
this.runnable = runnable;
this.delay = delay;
}
}
}
执行A、B、C任务的代码如下:
new StreamTimer()
.next(new TaskA())
.next(new TaskB(), 1000)
.next(new TaskC(), 1000)
.startup();
next
表示新增一个任务且需要在上一次任务执行完毕后才执行。以上代码非常简洁且可读性很强,任务间的依赖关系非常清晰,拓展也非常简单,新增任务时只需要在合适的地方插入next
方法即可。
比起上述例子,RxJava的功能要强大得多,也复杂得多,上述例子只是为了让新手能快速掌握响应式编程及流式调用(也称为链式调用),接下去我们开始讲解RxJava。
RxAndroid基本使用
使用RxAndroid需要在build.gradle中加入如下依赖:
compile 'io.reactivex.rxjava2:rxandroid:2.1.1'
compile 'io.reactivex.rxjava2:rxjava:2.2.7'
RxAndroid也直接去https://github.com/ReactiveX/RxAndroid下载源码,里面就不到10个类。
Observable和Observer
- Observable:被观察者,用来处理事件的派发。
- Observer:观察者,观察目标为Observable,Observable派发出来的事件将被它处理,一个Observable可以有多个Observer。当Observable有变化时,Observer能够立即响应这些变化。
熟悉观察者模式的同学应该对这两者有非常深刻的认识了,它们是RxJava中最基础的东西,RxJava中其他的对象、方法、操作符都是围绕这二者进行或拓展的。来看下最简单的例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
log("onSubscribe");
}
@Override
public void onNext(String s) {
log("onNext:" + s);
}
@Override
public void onError(Throwable e) {
error("onError", e);
}
@Override
public void onComplete() {
log("onComplete");
}
};
observable.subscribe(observer);
日志输出如下:
onSubscribe
onNext:A
onNext:B
onNext:C
onComplete
来看下上述例子中Observable
相关的对象和方法:
- 使用
Observable.create
创建Observable
对象。 -
ObservableEmitter
为发射器,Observable
使用它发射事件给所有Observer
。 - 使用
Observable.subscribe
添加一个Observer
。
接着来看下Observer
的几个方法:
- onSubscribe:在订阅
observable
时回调,可以在这里调用Disposable.dispose
取消订阅或者将Disposable
对象保存起来以便在后续某个时刻取消订阅。 - onNext:在
ObservableEmitter.onNext
执行后回调,onNext
表示的是整个响应链中的一环,在这里处理响应链中的其中一个任务,可以多次调用。 - onComplete:在
ObservableEmitter.onComplete
执行后回调,表示任务已全部完成,可以在这里做收尾工作。 - onError:在
ObservableEmitter.onError
执行后或者链中任一环节出现异常时回调,表示任务执行失败。
除了onSubscribe
,其它几个方法有如下特点:
- 和
ObservableEmitter
中的同名方法一一对应,在ObservableEmitter
的同名方法执行后回调。 -
onComplete
和onError
互斥,两者只能触发其中之一,且触发后onNext
便不会再触发。 -
onComplete
触发后,后续ObservableEmitter
调用任何方法都不会再生效。 -
onError
触发后,如果ObservableEmitter
再调用onError
或者onComplete
,RxJava会抛出异常,开发者需要自行保证唯一性。(处理方式为什么不同onComplete
?)
需要特别注意的,ObservableOnSubscribe.subscribe
方法在每次有新的Observer
加入时,都会在Observer.onSubscribe
回调之后触发,这就保证了所有的Observer
都能接收到事件。
subscribe
在上面提到了通过subscribe
为Observable
和Observer
建立了绑定关系,我们来看下方法原型:
void subscribe(Observer<? super T> observer)
除此之外,subscribe
还有很多重载方法,我们来看下所有的方法原型:
Disposable subscribe();
Disposable subscribe(Consumer<? super T> onNext);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
以上方法最终都会调用到参数最多的那个方法,而该方法内部又调用了subscribe(Observer<? super T> observer)
,这些方法相对来说更为简洁易用。可以发现新增了两个类,分别是Consumer
和Action
,其中Observer
的onNext
、onError
和onSubscribe
方法被Consumer.accept
方法取代,onComplete
方法被Action.run
方法取代。因此,如果我们仅仅只关心Observer
的其中一个或多个回调,那么便可以通过Consumer
或Action
来代替Observer
注册到Observable
中。如上述的例子中,如果我们只关心onNext
,那么可以这么使用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
});
observable.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
log("onNext:" + s);
}
}, Functions.<Throwable>emptyConsumer(),
new Action() {
@Override
public void run() throws Exception {
log("onComplete");
}
}
);
使用lambda表达式之后的代码如下:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
);
observable.subscribe(
(String s) -> log("onNext:" + s),
Functions.<Throwable>emptyConsumer(),
() -> log("onComplete")
);
代码显得简洁优雅易读得多,实际上RxJava是为响应式编程和函数式编程而生,因此使用lambda表达式才能完美的使用RxJava。
Android使用lambda表达式需要使用jdk1.8,且gradle版本需要4.0以上(也可能是3.x某个版本,记不住了,4.0以上准没错),在build.gradle中加入如下代码:
android { compileOptions { sourceCompatibility > JavaVersion.VERSION_1_8 targetCompatibility > JavaVersion.VERSION_1_8 } }
线程调度
以下称Observer.onXXX
回调时所在线程为Observer
工作线程,Observable
发射事件时所在线程为Observable
工作线程。默认情况下,Observer
和Observable
工作线程是同一个,该线程即调用Observable.subscribe
时所在的线程。然而,异步调用才是RxJava的核心应用场景,下面我们来看下如何改变Observer
和Observable
的工作线程。使用Observable.subscribeOn
配置Observable
工作线程,使用Observable.observeOn
配置Observer
工作线程。很多情况下,置Observable
工作线程位于子线程中,因为可能存在网络请求、数据存取等耗时操作;而Observer
工作线程位于主线程,因为接收到事件后需要刷新UI。下面来看下这种场景下的应用示例:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
log("emit thread=" + Thread.currentThread().getName());
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
);
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
(String s) -> log("onNext:" + s + " thread=" + Thread.currentThread().getName()),
Functions.<Throwable>emptyConsumer(),
() -> log("onComplete thread=" + Thread.currentThread().getName())
);
日志输出如下:
emit thread=RxNewThreadScheduler-2
onNext:A thread=main
onNext:B thread=main
onNext:C thread=main
onComplete thread=main
可以看到线程调度确实如我们所期望的了。需要特别强调的,subscribeOn
和observeOn
返回的不是原本的Observable
对象,因此如果没有采用链式调用,在调用这两个方法之后必须重新赋值给Observable
对象,如:
// 错误调用
observable.subscribeOn(xxx)
.observeOn(xxx);
observable.subscribe(xxx);
// 正确调用
observable = observable.subscribeOn(xxx)
.observeOn(xxx);
observable.subscribe(xxx);
如果重复调用subscribeOn
和observeOn
会怎么样呢,我们来看段代码:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
log("emit thread=" + Thread.currentThread().getName());
emitter.onNext("A");
}
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
日志输出如下:
emit thread=main
onNext1:A thread=RxSingleScheduler-1
可以得出如下临时结论:
-
subscribeOn
以第一次调用为准。 -
observeOn
以最后一次调用为准。
我们再来看下更复杂的例子:
Observable<String> observable = Observable.create(
(ObservableEmitter<String> emitter) -> {
log("emit thread=" + Thread.currentThread().getName());
emitter.onNext("A");
}
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.single())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
observable = observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io()) ;
observable.subscribe((String s) -> log("onNext2:" + s + " thread=" + Thread.currentThread().getName()));
日志输出如下:
emit thread=main
emit thread=main
onNext1:A thread=RxSingleScheduler-1
onNext2:A thread=RxCachedThreadScheduler-2
现在,我们可以得出最终结论了:
-
subscribeOn
以第一次调用为准。 -
observeOn
以调用subscribe
前的最后一次调用为准,每个subscribe
单独计算。
由此可知,我们可以让不同的Observer
在不同线程中调度。
RxJava使用Scheduler
来表示线程调度,上面提到的Schedulers.newThread()
和AndroidSchedulers.mainThread()
都是由RxJava提供的Scheduler
实现类。一般我们不需要手动去实现Scheduler
,而是通过Schedulers
或者AndroidSchedulers
(Android专用)获取。下面分别来看下二者所提供的创建能力。
Schedulers
newThread
大致等同于new Thread(runnable).start();
,线程数没有上限,除了测试场景一般不会用到它。
io
用于I/O操作场景,线程数没有上限。与newThread
比较相似,区别在于该调度器的内部使用了一个无数量上限的线程池,可以复用空闲的线程,因此效率更高。
computation
用于计算场景,计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,因此不要把I/O操作放在这里。该类型的Scheduler
使用固定数量的线程池,数量为处理器核数。除了阻塞(包括I/O操作、wait
等)外,其他操作都可以使用该调度器,不过通常用处理事件循环,大数据运算等。
single
单线程调度,所有任务都需要排队依次运行。
trampoline
任务在当前线程运行。
from(Executor executor)
使用指定的线程池调度。
AndroidSchedulers
mainThread
任务在主线程上运行。
from(Looper looper)
任务在指定Looper上调度。