响应式编程
RxJava提供了响应式编码规范,而RxAndroid是专供Android平台的RxJava(只是针对平台增加了少量类),一般Android开发者口中的RxJava指的便是RxAndroid。关于响应式编程的概念这里不多说,网上一大堆,我就简单说明下响应式编程与传统编程的区别。如果逻辑B依赖于逻辑A,逻辑C依赖于逻辑B,那么:
- 传统编程中的作法是在A执行完成后去执行B,B执行完毕后执行C,除了A、B、C本身的逻辑外,这部分由依赖产生的先后执行逻辑也是由开发者编写。
- 响应式编程中,会先将A、B、C做个依赖绑定,A->B->C,就像水流一样,从A流到B,再从B流到C,依赖产生的先后执行逻辑由语言或者库提供支持,开发者只需要编写A、B、C本身的逻辑以及告诉提供方三者间的关系即可。换句话说,就是B会自动响应A的执行结果,C会自动响应B的执行结果。
响应式编程可以很优雅地处理任务间(或者业务间)的依赖关系,而这关系就像流一样,并且多用于异步场景,且由于大家熟知的响应式编程库RxJava的主要应用场景也是异步,主要应用手段是流,因此不少文章简单粗暴地将响应式编程等同是异步+流,这个是比较片面的。
示例
我们来看个具体的例子,加深下理解。本例中需要处理三个任务,分别为A、B、C,这三个任务都需要在工作线程中执行,A执行完以后延迟1S执行B,B执行完以后延迟1S执行C。
以下为公共代码,定义了A、B、C三个任务以及定时器:
public class TaskA implements Runnable {
    @Override
    public void run() {
        // do something
    }
}
public class TaskB implements Runnable {
    @Override
    public void run() {
        // do something
    }
}
public class TaskC implements Runnable {
    @Override
    public void run() {
        // do something
    }
}
public class Timer {
    private static final Timer INSTANCE = new Timer();
    private Handler mHandler;
    private Timer() {
        HandlerThread thread = new HandlerThread("timer");
        thread.start();
        mHandler = new Handler(thread.getLooper());
    }
    public static Timer get() {
        return INSTANCE;
    }
    public void post(Runnable runnable) {
        mHandler.post(runnable);
    }
    public void postDelayed(Runnable runnable, long delay) {
        mHandler.postDelayed(runnable, delay);
    }
}
传统编程
Timer.get().post(new Runnable() {                          
       @Override                                            
       public void run() {                                  
           new TaskA().run();                               
           Timer.get().postDelayed(new Runnable() {         
               @Override                                    
               public void run() {                          
                   new TaskB().run();                       
                   Timer.get().postDelayed(new Runnable() { 
                       @Override                            
                       public void run() {                  
                           new TaskC().run();               
                       }                                    
                   }, 1000);                                
               }                                            
           }, 1000);                                        
       }                                                    
   });                                  
传统编程方式的问题非常突出,嵌入层次太多,随着依赖关系的增多以及复杂化,这样的代码会变得极其臃肿且不易阅读和维护。
响应式编程
首先我们需要编写一个支持响应式编程规范的库,代码如下(这个类很多情况没有考虑到,仅仅是为了演示用):
public class StreamTimer implements Runnable {
    private List<Task> mTasks = new LinkedList<>();
    public StreamTimer() {
    }
    public StreamTimer next(Runnable runnable) {
        return next(runnable, 0);
    }
    public StreamTimer next(Runnable runnable, long delay) {
        Task task = new Task(runnable, delay);
        mTasks.add(task);
        return this;
    }
    public void startup() {
        startNextTimer();
    }
    private void startNextTimer() {
        if(mTasks.isEmpty()) {
            return;
        }
        Task task = mTasks.get(0);
        Timer.get().postDelayed(this, task.delay);
    }
    @Override
    public void run() {
        Task task = mTasks.remove(0);
        task.runnable.run();
        startNextTimer();
    }
    private class Task {
        Runnable runnable;
        long delay;
        Task(Runnable runnable, long delay) {
            this.runnable = runnable;
            this.delay = delay;
        }
    }
}
执行A、B、C任务的代码如下:
new StreamTimer()           
   .next(new TaskA())       
   .next(new TaskB(), 1000) 
   .next(new TaskC(), 1000) 
   .startup();              
next表示新增一个任务且需要在上一次任务执行完毕后才执行。以上代码非常简洁且可读性很强,任务间的依赖关系非常清晰,拓展也非常简单,新增任务时只需要在合适的地方插入next方法即可。
比起上述例子,RxJava的功能要强大得多,也复杂得多,上述例子只是为了让新手能快速掌握响应式编程及流式调用(也称为链式调用),接下去我们开始讲解RxJava。
RxAndroid基本使用
使用RxAndroid需要在build.gradle中加入如下依赖:
compile 'io.reactivex.rxjava2:rxandroid:2.1.1'
compile 'io.reactivex.rxjava2:rxjava:2.2.7'
RxAndroid也直接去https://github.com/ReactiveX/RxAndroid下载源码,里面就不到10个类。
Observable和Observer
- Observable:被观察者,用来处理事件的派发。
- Observer:观察者,观察目标为Observable,Observable派发出来的事件将被它处理,一个Observable可以有多个Observer。当Observable有变化时,Observer能够立即响应这些变化。
熟悉观察者模式的同学应该对这两者有非常深刻的认识了,它们是RxJava中最基础的东西,RxJava中其他的对象、方法、操作符都是围绕这二者进行或拓展的。来看下最简单的例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");
        emitter.onNext("B");
        emitter.onNext("C");
        emitter.onComplete();
    }
});
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        log("onSubscribe");
    }
    @Override
    public void onNext(String s) {
        log("onNext:" + s);
    }
    @Override
    public void onError(Throwable e) {
        error("onError", e);
    }
    @Override
    public void onComplete() {
        log("onComplete");
    }
};
observable.subscribe(observer);
日志输出如下:
onSubscribe
onNext:A
onNext:B
onNext:C
onComplete
来看下上述例子中Observable相关的对象和方法:
- 使用Observable.create创建Observable对象。
- 
ObservableEmitter为发射器,Observable使用它发射事件给所有Observer。
- 使用Observable.subscribe添加一个Observer。
接着来看下Observer的几个方法:
- onSubscribe:在订阅observable时回调,可以在这里调用Disposable.dispose取消订阅或者将Disposable对象保存起来以便在后续某个时刻取消订阅。
- onNext:在ObservableEmitter.onNext执行后回调,onNext表示的是整个响应链中的一环,在这里处理响应链中的其中一个任务,可以多次调用。
- onComplete:在ObservableEmitter.onComplete执行后回调,表示任务已全部完成,可以在这里做收尾工作。
- onError:在ObservableEmitter.onError执行后或者链中任一环节出现异常时回调,表示任务执行失败。
除了onSubscribe,其它几个方法有如下特点:
- 和ObservableEmitter中的同名方法一一对应,在ObservableEmitter的同名方法执行后回调。
- 
onComplete和onError互斥,两者只能触发其中之一,且触发后onNext便不会再触发。
- 
onComplete触发后,后续ObservableEmitter调用任何方法都不会再生效。
- 
onError触发后,如果ObservableEmitter再调用onError或者onComplete,RxJava会抛出异常,开发者需要自行保证唯一性。(处理方式为什么不同onComplete?)
需要特别注意的,ObservableOnSubscribe.subscribe方法在每次有新的Observer加入时,都会在Observer.onSubscribe回调之后触发,这就保证了所有的Observer都能接收到事件。
subscribe
在上面提到了通过subscribe为Observable和Observer建立了绑定关系,我们来看下方法原型:
void subscribe(Observer<? super T> observer)
除此之外,subscribe还有很多重载方法,我们来看下所有的方法原型:
Disposable subscribe();
Disposable subscribe(Consumer<? super T> onNext);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}
以上方法最终都会调用到参数最多的那个方法,而该方法内部又调用了subscribe(Observer<? super T> observer),这些方法相对来说更为简洁易用。可以发现新增了两个类,分别是Consumer和Action,其中Observer的onNext、onError和onSubscribe方法被Consumer.accept方法取代,onComplete方法被Action.run方法取代。因此,如果我们仅仅只关心Observer的其中一个或多个回调,那么便可以通过Consumer或Action来代替Observer注册到Observable中。如上述的例子中,如果我们只关心onNext,那么可以这么使用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");
        emitter.onNext("B");
        emitter.onNext("C");
        emitter.onComplete();
    }
});
observable.subscribe(
        new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                log("onNext:" + s);
            }
            }, Functions.<Throwable>emptyConsumer(),
        new Action() {
            @Override
            public void run() throws Exception {
                log("onComplete");
            }
        }
);
使用lambda表达式之后的代码如下:
Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onComplete();
        }
    );
observable.subscribe(
        (String s) -> log("onNext:" + s),
        Functions.<Throwable>emptyConsumer(),
        () -> log("onComplete")
);
代码显得简洁优雅易读得多,实际上RxJava是为响应式编程和函数式编程而生,因此使用lambda表达式才能完美的使用RxJava。
Android使用lambda表达式需要使用jdk1.8,且gradle版本需要4.0以上(也可能是3.x某个版本,记不住了,4.0以上准没错),在build.gradle中加入如下代码:
android { compileOptions { sourceCompatibility > JavaVersion.VERSION_1_8 targetCompatibility > JavaVersion.VERSION_1_8 } }
线程调度
以下称Observer.onXXX回调时所在线程为Observer工作线程,Observable发射事件时所在线程为Observable工作线程。默认情况下,Observer和Observable工作线程是同一个,该线程即调用Observable.subscribe时所在的线程。然而,异步调用才是RxJava的核心应用场景,下面我们来看下如何改变Observer和Observable的工作线程。使用Observable.subscribeOn配置Observable工作线程,使用Observable.observeOn配置Observer工作线程。很多情况下,置Observable工作线程位于子线程中,因为可能存在网络请求、数据存取等耗时操作;而Observer工作线程位于主线程,因为接收到事件后需要刷新UI。下面来看下这种场景下的应用示例:
Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            log("emit thread=" + Thread.currentThread().getName());
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onComplete();
        }
);
observable.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
                (String s) -> log("onNext:" + s + " thread=" + Thread.currentThread().getName()),
                Functions.<Throwable>emptyConsumer(),
                () -> log("onComplete thread=" + Thread.currentThread().getName())
        );
日志输出如下:
emit thread=RxNewThreadScheduler-2
onNext:A thread=main
onNext:B thread=main
onNext:C thread=main
onComplete thread=main
可以看到线程调度确实如我们所期望的了。需要特别强调的,subscribeOn和observeOn返回的不是原本的Observable对象,因此如果没有采用链式调用,在调用这两个方法之后必须重新赋值给Observable对象,如:
// 错误调用
observable.subscribeOn(xxx)
    .observeOn(xxx);
observable.subscribe(xxx);
// 正确调用
observable = observable.subscribeOn(xxx)
    .observeOn(xxx);
observable.subscribe(xxx);
如果重复调用subscribeOn和observeOn会怎么样呢,我们来看段代码:
Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            log("emit thread=" + Thread.currentThread().getName());
            emitter.onNext("A");
        }
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.single())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
日志输出如下:
emit thread=main
onNext1:A thread=RxSingleScheduler-1
可以得出如下临时结论:
- 
subscribeOn以第一次调用为准。
- 
observeOn以最后一次调用为准。
我们再来看下更复杂的例子:
Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            log("emit thread=" + Thread.currentThread().getName());
            emitter.onNext("A");
        }
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.single())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
observable = observable.subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io()) ;
observable.subscribe((String s) -> log("onNext2:" + s + " thread=" + Thread.currentThread().getName()));
日志输出如下:
emit thread=main
emit thread=main
onNext1:A thread=RxSingleScheduler-1
onNext2:A thread=RxCachedThreadScheduler-2
现在,我们可以得出最终结论了:
- 
subscribeOn以第一次调用为准。
- 
observeOn以调用subscribe前的最后一次调用为准,每个subscribe单独计算。
由此可知,我们可以让不同的Observer在不同线程中调度。
RxJava使用Scheduler来表示线程调度,上面提到的Schedulers.newThread()和AndroidSchedulers.mainThread()都是由RxJava提供的Scheduler实现类。一般我们不需要手动去实现Scheduler,而是通过Schedulers或者AndroidSchedulers(Android专用)获取。下面分别来看下二者所提供的创建能力。
Schedulers
newThread
大致等同于new Thread(runnable).start();,线程数没有上限,除了测试场景一般不会用到它。
io
用于I/O操作场景,线程数没有上限。与newThread比较相似,区别在于该调度器的内部使用了一个无数量上限的线程池,可以复用空闲的线程,因此效率更高。
computation
用于计算场景,计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,因此不要把I/O操作放在这里。该类型的Scheduler使用固定数量的线程池,数量为处理器核数。除了阻塞(包括I/O操作、wait等)外,其他操作都可以使用该调度器,不过通常用处理事件循环,大数据运算等。
single
单线程调度,所有任务都需要排队依次运行。
trampoline
任务在当前线程运行。
from(Executor executor)
使用指定的线程池调度。
AndroidSchedulers
mainThread
任务在主线程上运行。
from(Looper looper)
任务在指定Looper上调度。