RxJava笔记
前言
看此篇之前最好知道RxJava的使用。由于RxJava内部源码实现有点复杂,既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,只留下核心代码,加上我个人的理解,带大家揭秘RxJava的实现原理(本文不涉及框架的使用介绍)。
一、构建观察者类
Subsribler在RxJava里面是一个抽象类,它实现了Observer接口。
public interface Observer<T> {
void onCompleted();
void onError(Throwable throwable);
void onNext(T value);
}
public abstract class Subscriber<T> implements Observer<T>{
public void onStart(){
}
}
二、构建被观察者
Observable(被观察者)拥有很多工厂方法和各式各样的操作符。每个Observable里面都维护了一个OnSubscribe对象,并通过subscribe()里面的call(Subscriber<? super T> subscriber)方法与观察者产生联系。
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
private Observable(OnSubscribe<T> onSubscribe){
this.onSubscribe = onSubscribe;
}
public static <T> Observable<T> create(OnSubscribe<T> onSubscribe){
return new Observable<T>(onSubscribe);
}
public void subscribe(Subscriber<T> subscriber){
subscriber.onStart();
onSubscribe.call(subscriber);
}
public interface OnSubscribe<T>{
void call(Subscriber<? super T> subscriber);
}
}
三、RxJava的事件流雏形产生
通过上面写的观察者和被观察者,即可写出一个没有操作符和线程切换功能的简易版Rxjava。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i = 0; i < 10; i++){
subscriber.onNext(i);
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer value) {
System.out.println("Result: "+value);
}
});
通过Observable.create将OnSubscribe的匿名类传给Observable,在subscribe()时回调OnSubscribe接口中的call方法,同时call方法参数即为subscribe的参数,即观察者,因此继续回调subscriber.onNext()即可完成观察者里的逻辑。
结果如下:
四、玩转RxJava里的操作符
RxJava之所以强大好用,与其拥有丰富灵活的操作符是分不开的。那么我们就试着为这个框架添加一个最常用的操作符:map。先看代码:
public <R> Observable<R> map(final Fun1<T, R> transformer){
return create(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> subscriber) {
Observable.this.onSubscribe.call(new Subscriber<T>() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onNext(T value) {
subscriber.onNext(transformer.transfer(value));
}
});
}
});
}
public interface Fun1<T, R>{
R transfer(T from);
}
测试代码
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i = 0; i < 10; i++){
subscriber.onNext(i);
}
}
}).map(new Observable.Fun1<Integer, String>() {
@Override
public String transfer(Integer from) {
return String.valueOf(from)+"_Map";
}
}
).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(String value) {
System.out.println("Result: "+value);
}
});
结果如下:
其实RxJava每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。桥接的Observable内部会实例化新的OnSuscribe和Subscriber。
新建的OnSuscribe的call方法负责持有目标Subscriber,此时就可以回调subscriber的方法来完成观察的行为了。但是这是还没有数据源,想要获得数据源必须调用源Observable.OnSubscribe的subscribe方法,传入一个新的Subscriber,这样就可以在它的onNext()方法中获得数据源,并经过传入的接口处理后,发送给最终的Subscriber。
总体来说就是源Observable.OnSubscribe将Event往下发送给桥接Observable.Subscriber,最终桥接Observable.Subscriber将Event做相应处理后转发给目标Subscriber。
五、RxJava里的线程切换
RxJava中最激动人心的功能是异步处理,能够自如地切换线程。
利用subscribeOn() 结合observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。 observeOn() 可以多次调用,Subscriber的执行线程与最后一次observeOn()的调用有关。但subscribeOn() 多次调用只有第一个subscribeOn() 起作用。
这是因为 observeOn() 作用的是Subscriber,而subscribeOn() 作用的是OnSubscribe,这时事件还没开始发送,因此subscribeOn()的线程控制可以从事件发出的开端就造成影响。
线程调度除了桥接Observable以外,RxJava还用到一个很关键的类Scheduler(调度器)。
5.1 Scheduler核心代码如下:
public class Scheduler {
private final static Scheduler ioScheduler
= new Scheduler(Executors.newSingleThreadExecutor());
Executor executor;
public Scheduler(Executor executor){
this.executor = executor;
}
public Worker createWorker(){
return new Worker(executor);
}
public static class Worker {
Executor executor1;
public Worker(Executor executor1){
this.executor1 = executor1;
}
public void schedule(Runnable runnable){
executor1.execute(runnable);
}
}
public static Scheduler io(){
return ioScheduler;
}
}
具体的Scheduler的实现类就不看了,但我们需要知道,能做到线程切换的关键是Worker的schedule方法,因为它会把传过来的任务放入线程池,并在新线程中执行。
5.2 实现observeOn
observeOn是作用于下层Subscriber的,需要让下层Subscriber的事件处理方法放到新线程中执行。为此,在Observable类里面,添加如下代码:
public Observable<T> observeOn(final Scheduler scheduler){
return create(new OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
subscriber.onStart();
final Scheduler.Worker worker = scheduler.createWorker();
Observable.this.onSubscribe.call(new Subscriber<T>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(final T value) {
worker.schedule(new Runnable() {
@Override
public void run() {
subscriber.onNext(value);
}
});
}
});
}
});
}
测试代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i = 0; i < 10; i++){
subscriber.onNext(i);
}
}
}).map(new Observable.Fun1<Integer, String>() {
@Override
public String transfer(Integer from) {
return String.valueOf(from)+"_Map";
}
}
).observeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(String value) {
System.out.println("Result: "+Thread.currentThread().getName());
}
});
结果如下:
5.3 实现subscribeOn
subscribeOn是作用于上层OnSubscribe的,可以让OnSubscribe的call方法在新线程中执行。
因此,在Observable类里面,添加如下代码:
public Observable<T> subscribeOn(final Scheduler scheduler){
return create(new OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
scheduler.createWorker().schedule(new Runnable() {
@Override
public void run() {
Observable.this.onSubscribe.call(subscriber);
}
});
}
});
}
测试代码如下:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("Observable thread: "+Thread.currentThread().getName());
for(int i = 0; i < 10; i++){
subscriber.onNext(i);
}
}
}).map(new Observable.Fun1<Integer, String>() {
@Override
public String transfer(Integer from) {
System.out.println("Map Observable thread: "+Thread.currentThread().getName());
return String.valueOf(from)+"_Map";
}
}
).observeOn(Scheduler.io()).subscribeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(String value) {
// System.out.println("Result: "+Thread.currentThread().getName());
}
});
结果如下:
六、总结
相信看RxJava这个简易版的设计对大家的启示,比网上的一些源码解析清晰的多,希望可以抛砖引玉。有时候我们总是认为看几篇博文貌似当时就懂了明白了,但是这种理解或者说记忆貌似不持久。过了一段时间总是还给博主了。学习还是得深入源码,从源码中学习,然后在结合其他人的博客查漏补缺,这样才是自己的东西。大家有兴趣可以把flatMap等其他操作符来自己实现一下。