因为工作需要刚好在学习 RxJava + Retrofit2 + OkHttp3 网络请求框架,网上搜了一些 RxJava 的教程,并不是很好理解,所幸最后我找到了几篇有助于初学者了解 RxJava 的文章,于是结合自己的理解,重新整理成一篇发给大家,希望通过我的咀嚼,能够帮助大家更快的了解和上手 RxJava,尤其是文章的最后,你将理解是怎样一种优势,乃至于我们开始考虑用 RxJava 来替代传统方式。话不多说,以下正文。
观察者模式
RxJava 涉及到所谓的观察者模式。
观察者模式,按我的理解,就是所谓的被观察者和订阅者之间的关系。
其形式是,在作为被观察者的一个方法中,包含着一段订阅,该订阅的本质是抽象类或接口的引用对抽象方法的调用,而该订阅实际指向的是作为实现类的订阅者具体实现的方法。
(这实际上就是用到了面向对象编程中“向上转型”的概念。向上转型的套路是:实现类实现了抽象类定义的抽象方法;将实现类的对象赋值给抽象类的引用;然后通过抽象类的引用来调用抽象方法,而该调用实际指向实现类具体实现的方法……好吧,我这里相当于把面向对象的基础又过了一遍,不懂向上转型概念的先滚去看 Mars 的 Java4Android 教程 - - )
//抽象类或接口
public interface Printer{
void print(String s);//定义抽象函数
}
//实现类实现抽象函数
class CanonPrinter implement Printer{
@override
public void print(String s){
System.out.println(s);
}
}
//在被观察者中,通过抽象类或接口的引用来调用抽象函数
private static void printSth(Printer printer){
printer.print("ha ha ha");
}
//执行被观察者时,传入实现类的对象作为实参赋值给抽象类或接口的引用,
//从而在被观察者中通过抽象类或接口的引用来调用print抽象方法,
//而该抽象方法实际指向实现类实现的print方法
public static void main(String[] args){
printSth(new CanonPrinter());
}
所以若是真要理解成观察和被观察,按照这个逻辑也的确说的过去,也就是说,被观察者刚好触发了这里,然后直接跳转到订阅者实现的方法内部去具体执行被触发后该做出的反应。
被观察者
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("aaa");
sub.onNext("bbb");
sub.onCompleted();
}
}
);
订阅者
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
关联起来
myObservable.subscribe(mySubscriber);
有没有觉得和
btn.setOnClickListener(onClickListener)很相似,是的,就是这样。
这一步就是将实现类的对象赋值给抽象类或接口的引用。
不过,观察者和订阅者的创建也有简化版。
例如当观察者只触发一个事件(onNext)时,可以直接使用 just。
观察者
Observable<String> myObservable = Observable.just("Hello, world!");
订阅者
当不关心 OnComplete 和 OnError,只需要在 onNext 时做一些处理,可以用 Action1 类
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
};
subscribe 方法有一个重载版本,接受1~3个 Action1 类型的参数,分别对应 OnNext,OnError,OnComplete
然后我们现在只需要 onNext,就只需要传入一个参数
myObservable.subscribe(onNextAction);
这样,上述的代码最终可以写成
Observable.just("aaa")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
注意,泛型类不能直接写在类中作为对象的实例变量,当它没有被指定类型时,因为泛型是编译时,不是运行时。上述代码因为 Observable 不指定具体<T>,因此,你应写在方法中,使 Observable 作为局部变量。
而使用 lambda,可以使代码变得更简洁
Observable.just("aaa")
.subscribe(s -> System.out.println(s));
那如果我想做一些手脚怎么办,特别是,观察者不能被改动,订阅者也不能被改动的时候
这时我们可以通过操作符 map 来完成改动(操作符还有很多,负责中间过程改动的,是map)
例如我给输入的文字加个后缀
Observable.just("aaa")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + ".jpg";
}
})
.subscribe(s -> System.out.println(s));
lambda:
Observable.just("aaa")
.map(s -> s + ".jpg")
.subscribe(s -> System.out.println(s));
或者在中间过程获得类型不同的数据
Observable.just("aaa")
.map(s -> s.hashCode())
.map(i -> Integer.toString(i))
.subscribe(s -> System.out.println(s));
那么到这里为止,我们知道了,RxJava 本质上和我们之前用的“回调”性质是一样的,就是作为观察者的一方调用抽象函数,作为订阅者的另一方实现抽象函数,并且二者通过 subscribe 方法关联,也就是,在观察者的 subscribe 方法中实现订阅者的匿名内部类,来具体实现被触发的方法。
但是故事说到这里,好像还是没有异步什么事啊,它分明就是“同线程中的身首异处”嘛。
下面来看看操作符
操作符
上述我们已经提到了操作符 map。
然后此处我们有个这样的需求:输入一个关键字,返回相关结果的 url 列表。相当于搜索引擎。
一般我们会这么做
Observable<List<String>> query(String text);
query("Hello, world!")
.subscribe(urls -> {
for (String url : urls) {
System.out.println(url);
}
});
有for循环,这样看起来有点繁杂
我们可以通过操作符 from,对数组中的数据进行逐个处理
Observable.from(urls)
.subscribe(url -> System.out.println(url));
也就是
query("Hello, world!")
.subscribe(urls -> {
Observable.from(urls)
.subscribe(url -> System.out.println(url));
});
但这样破坏了 RxJava 的结构,因为订阅者最好是保持本身单一的功能,而数据的改变最好在中间过程中通过操作符来完成。
此时我们可以通过操作符 flatMap 来处理这个问题。
Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个Observable
query("Hello, world!")
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> urls) {
return Observable.from(urls);
}
})
.subscribe(url -> System.out.println(url));
lambda:
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.subscribe(url -> System.out.println(url));
flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List<String>,而是收到一些列单个的字符串,就像 Observable.from() 的输出一样
接着前面的例子,现在我不想打印 url 了,而是要打印收到的每个网站的标题。我当然不能直接将 getTitle 方法放进订阅者, 因为一再强调了,订阅者实现的功能要单一,不要轻易改动。那我将考虑在中间过程中来完成这个改动,这样的话,我的 getTitle 方法每次只能传入一个 url,并且返回值不是一个 String,而是一个输出 String 的 Observable 对象。flatMap 不正是这么运作的吗(通过输入的 Observable 对象得到里面的数据,再将数据输出,以 Observable 对象的方式传递),所以还是考虑使用 flatMap。
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String url) {
return getTitle(url);
}
})
.subscribe(title -> System.out.println(title));
lambda:
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.subscribe(title -> System.out.println(title));
那么如果我想过滤掉一些情况,例如返回 url 为 null 的,我不要它们的标题了,过滤掉,可以用操作符 filter。
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.subscribe(title -> System.out.println(title));
如果想指定输出元素的数量,可以用 take。
以下限制输出 5 个。
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.take(5)
.subscribe(title -> System.out.println(title));
如果想在每次触及订阅之前,完成一些事情,可以用 doOnNext。
例如以下在输出之前,将标题保存到某处。
query("Hello, world!")
.flatMap(urls -> Observable.from(urls))
.flatMap(url -> getTitle(url))
.filter(title -> title != null)
.take(5)
.doOnNext(title -> saveTitle(title))
.subscribe(title -> System.out.println(title));
其他的操作符还可以来这篇文章了解
https://mrfu.me/2016/01/10/RxWeekend/#tips7
介绍完操作符,现在开始讲重点。
为什么用 RxJava,它和传统的回调以及异步处理相比存在什么优势?
错误处理
Observable.just("Hello, world!")
.map(s -> potentialException(s))
.map(s -> anotherPotentialException(s))
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { System.out.println("Completed!"); }
@Override
public void onError(Throwable e) { System.out.println("Ouch!"); }
});
代码中的 potentialException() 和 anotherPotentialException() 有可能会抛出异常。每一个 Observerable 对象在终结的时候都会调用 onCompleted() 或者 onError() 方法,所以 Demo 中会打印“Completed!”或者“Ouch!”
这种模式有以下几个优点:
1.只要有异常发生 onError() 一定会被调用
这极大的简化了错误处理。只需要在一个地方处理错误即可以。
2.操作符不需要处理异常
将异常处理交给订阅者来做,Observerable 的操作符调用链中一旦有一个抛出了异常,就会直接执行 onError() 方法。
3.你能够知道什么时候订阅者已经接收了全部的数据
知道什么时候任务结束能够帮助简化代码的流程。
这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。
使用 RxJava,Observable 对象根本不需要知道如何处理错误!操作符也不需要处理错误状态:一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做。
(这里的意思是说,传统的方式,就算也存在中间过程,那错误也需要在中间过程的每一步中分别处理。而 RxJava 不需要在中间过程处理错误,有错误直接跳到最后,统一由订阅者处理)
调度器
使用 RxJava,你可以使用 subscribeOn() 指定观察者代码运行的线程,使用 observerOn() 指定订阅者运行的线程:
(这个过程简单的说就是,订阅者在主线程观察被观察者,所以这个动作被成为 observeOn。而被观察者在异线程添加了订阅者的订阅,所以这个动作叫 subscribeOn,其实说 be subscribe on 更合适)
myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
subscribeOn() 和 observerOn() 可以被添加到任何 Observable 对象上,这两个也是操作符。我不需要关心 Observable 对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度
如果使用 AsyncTask 或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用 RxJava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以
订阅
当调用 Observable.subscribe(),会返回一个 Subscription 对象。这个对象代表了被观察者和订阅者之间的联系
Subscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));
subscription.unsubscribe();
RxJava 的另外一个好处就是它处理 unsubscribing 的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用 unsubscribe 将会在他当前执行的地方终止。不需要做任何额外的工作!
最后,再次声明一遍,这篇文章的学习,其主线是参考自以下这篇博客,是一篇对国外大神的译文。
http://blog.csdn.net/lzyzsd/article/details/41833541/
后面的文章,链接在这里,目前先没有这个打算继续看下去……
http://blog.csdn.net/lzyzsd/article/details/45033611/
以及有人推荐这篇,不知道讲的什么鬼~
http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/