前言
随着RxJava越来越火爆,很多人想入门却不知道从何做起,笔者整理了一天,总结出一些比较实用的东西,望各位看官斧正。
一、RxJava到底是什么,有什么好处?
关键词:异步、简洁。
RxJava可以进行异步调用,使每一步都可以自行处理其所在的线程,典型的流式代码结构,像读一首诗一样顺畅。即使以后逻辑越来越复杂,关于Rx的代码依然很顺畅。为什么这么说,请往下看。
二、RxJava例子
先比较一下下方的两个代码:
new Thread(){
@Override
public void run() {
super.run();
//网络请求~~~~~之后
{
runOnUiThread(new Runnable() {
@Override
public void run() {
ts.setText("这是通过newThread");
}
});
}
}
}.start();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//异步操作网络请求后
{
subscriber.onNext("这是通过RxJava");
subscriber.onCompleted();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
ts2.setText(o.toString());
}
});
同样是开启新线程,从异步线程获取数据,然后再UI线程中写入,代码量相比较,下方的要比上面的多,但是对于易读性来讲,流式的代码结构,从上向下阅读,条理清晰,操作简单,不论以后逻辑修改多么复杂,几乎不会有太大的变化,不像第一部分的代码,如果说复杂度增加的话,就会很麻烦。
三、RxJava使用方法
本文是基于Intelli JDEA 进行的开发,使用了gradle,只需要加一行配置引入即可:
compile 'io.reactivex:rxjava:1.2.1'
概念基础
RxJava使用的设计模式为观察者模式,主要的步骤就是创建观察者,被观察者,订阅。比较熟悉的同学可以略过这一段。
如上图所示,观察者模式可有四个模块:
被观察者接口类:主要提供接口声明包括订阅 解除订阅 通知功能,和保存观察者对象。
观察者接口类:主要提供一个方法供被观察者使用,当被观察者触发了一些条件后,通知观察者。
被观察者实现类及观察者实现类:实现以上接口。
举个简单的例子,在家懒得做饭订了外卖,跟送外卖的说(被观察者),等你到楼下了给我打电话(订阅),等外卖到的时候,就会给你打电话,通知你去取外卖(通知),这就是一个观察者模式应用。
使用流程
RxJava的开发主要分为三个步骤:apple、pen、Ah~
1、Observer (观察者)
Observer属于观察者,我们可以看一下源码:
public interface Observer<T> {
onCompleted();
onError(Throwable e);
void onNext(T t);
}
很简单,就三个方法,代表着完成(onCompleted),错误(onError),接收数据(onNext)。
另外还有一个实现了Observer的抽象类Subscriber,主要方法和Observer差不多,比较大的区别就是可以使用onStart()方法和unsubscribe()方法。
onStart():主要在事件未发出的时候调用,仅能在subscribe 所发生的线程被调用,可以做一些数据处理。
unsubscribe():用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。主要用于当某些对象不再使用的时候,但是并没有收到onCompleted消息的时候调用,防止出现内存泄露。
主要实现代码:
Subscriber observer = new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onStart() {
super.onStart();
System.out.println("----onStart----");
}
};
2、Observable(被观察者)
Observable为被观察者,由被观察者来决定是否发送消息,何时发送消息,异步操作一般都在这里进行,实现方式:
Observable observable = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
subscriber.onNext("111");
subscriber.onCompleted();
}
});
调用Observable的create方法来创建被观察者,其中call方法的参数即可视为第一步的Observer ,在进行订阅的时候,会直接执行call方法内的代码。
除了create方法外,还有简单的方法来直接实现,just(T...)方法和from(T[]) 方法
String[] txt = {"easd","dasd","dasd"};
Observable observable = Observable.from(txt);
Observable observable2 = Observable.just("easd","dasd","dasd");
查看源码后,其实实现的方法也是create方法,只是经过了一些简单的封装,这里就简单介绍一下from方法。
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}
又上面代码可知,from方法首先对数组进行了判断,然后return了一个Observable的对象,使用的是create()方法,那么OnSubscribeFromArray又做了什么呢?
public final class OnSubscribeFromArray<T> implements OnSubscribe<T>
这是OnSubscribeFromArray实际上也是实现了OnSubscribe接口,那么我们去看一下call()方法:
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;
long e = 0L;
int i = index;
for (;;) {
while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(array[i]);
i++;
if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}
r--;
e--;
}
r = get() + e;
if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
主要调用了这三个方法,当数组长度等于long的最大值时,调用fastPath(),反之则 slowPath(n),两个方法的主要目的其实也就是将数组进行遍历,然后分发出去。由此可知无论是just()还是from()都可以视为与create()方法等价。
3、Subscribe (订阅)
最激动人心的时候到了,之前的准备已经完成了之后,就可以将观察者和被观察者进行订阅。
observable.subscribe(observer);
observable.subscribe(subscriber);
Ah~~~~这样关系就定下来了。subscribe方法代码比较啰嗦,就简化一下发到下面:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
//1 !!
subscriber.onStart();
try {
//2 !!
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
主要就三步:
一调用subscriber的onStart()方法,
二调用被观察者observable的OnSubscribe的call方法,
三将subscriber变成Subscription 返回,可以直接使用unsubscribed()方法。
另外,subscribe方法支持不完整的回调,可以使用Action1和Action0作为回调:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
//分别对应onNext,onError,onComplete。
//也可以
observable.subscribe(onNextAction,onErrorAction);
observable.subscribe(onNextAction);
//在源码里都有不同的方法去对应
以上所说都是关于RxJava的使用方法,但是!跟特么所说的异步没毛关系,并没有什么卵用,下面开始正片
三、RxJava的异步调度
这个东西专门拿出来当一个大分类来讲,在RxJava里如果没有这个东西,要他有何用。这个神奇的东西叫Scheduler,线程调度器。主要用在两个方法里,observeOn()和subscribeOn(),分别对应观察者操作线程和被观察操作线程。
总共来讲有这么几个线程:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
还有个Schedulers.from(Executor executor),你可以自定义线程池来处理。
是不是很吊!
再次回到一开始放出来的代码:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//异步操作网络请求后
{
subscriber.onNext("这是通过RxJava");
subscriber.onCompleted();
}
}
})
.subscribeOn(Schedulers.io())//Observable操作放到io线程里
.observeOn(AndroidSchedulers.mainThread())//Observable操作放在UI线程里
.subscribe(new Action1<Object>() {//缺省的onNext调用
@Override
public void call(Object o) {
ts2.setText(o.toString());
}
});
这样是不是就很容易理解了?
今天就先到此为止,下一节主要介绍变换和Subject这个神奇的东西