RXjava
RXjava是什么,有什么作用?如何使用它?
RXjava
是一个使用可观测的序列来组成异步的、基于事件的程序的库。简单的说,就是用一个简洁明了的程序序列,来组成异步程序。让异步操作更加容易读写。- 它的作用就是让异步,更简洁,而且当有大量异步操作的时候,仍然很简洁。
- 使用起来也不难。调用方法即可。
RXjava类图
如图所示,先从简单的说起。RXjava
类似于观察者模式,存在着观察者与被观察者,就相当于button
的onclicklistener
。需要对被观察者一举一动的瞬间有所反应,但是又不需要时刻观察被观察者的状态。
先从流程说起:
- 调用
Observable
的oncreate
方法,会返回一个observable
对象。而oncreate
方法中,是有一个onsubscribe
对象,这个对象中有个call
方法,在这个call
方法中,规定了之后observer
的执行步骤。 - 然后写一个
observer
,里面有onnext,oncompleted,onerror三个方法。这里还有一个subscriber
类,这个类继承自observer
,比它多两个方法,onstart,unsubscribe。这里onstart
顾名思义是在subscriber
开始的时候运行的,这个后面在subscribe
方法中会有调用,到时候再解释。unsubscribe
是subscriber
所实现的另外一个接口subscription
里面的方法,用于取消订阅。 - 这里是最关键的,在
observer
(或者说是subscriber
,因为在订阅的时候,observer
会被转换为subscriber
)和observable
都有了之后。就开始订阅了,方法是observable.subscribe(subscriber);
看起来比较像是被观察者订阅了观察者,其实不然,可以理解为被观察者与观察者建立了一种联系。subscribe方法其实是:
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
这里调用了subscriber
的onstart
方法,然后调用了之前在oncreate
方法中定义的call
方法,然后return
一个subscriber
对象。通过这个对象判断是否在订阅,为unsubscribe
方法备用。
- 上面的方法还有拓展,比如直接调用
Observable.from(一个string数组);
,或者Observable.just("1","2","3");
这两个方法返回的是一个observable
对象,相当于包装了oncreate
方法。而oncreate
方法中,是有对接下来observer
的运行步骤有定义的,这里的定义就相当于
onNext("1");
onNext("2");
onNext("3");
onCompleted();
- 还有一种拓展是
Action1<string>()
;这个在RXjava类图中有解释。
知道上面这些流程,也没有卵用。因为RXjava最重要的是异步,而上面全部都是在同一个线程上面。所以接下来要在多线程上面执行这个框架。
在RxJava 中,Scheduler
——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler
,它们已经适合大多数的使用场景:
-
Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。 -
Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。 -
Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。
-Schedulers.computation()
: 计算所使用的 Scheduler。这个计算指的是CPU
密集型计算,即不会被I/O
等操作限制性能的操作,例如图形的计算。这个Scheduler
使用的固定的线程池,大小为CPU
核数。不要把I/O
操作放在computation()
中,否则I/O
操作的等待时间会浪费CPU
。
另外,
Android
还有一个专用的AndroidSchedulers.mainThread()
,它指定的操作将在Android
主线程运行。
有了这几个Scheduler
,就可以使用subscribeOn()
和observeOn()
两个方法来对线程进行控制了。
-
subscribeOn()
: 指定subscribe()
所发生的线程,即Observable.OnSubscribe
被激活时所处的线程。或者叫做事件产生的线程。
-
observeOn()
: 指定Subscriber
所运行在的线程。或者叫做事件消费的线程。
文字叙述总归难理解,上代码:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面这段代码中,由于
subscribeOn(Schedulers.io())
的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于
observeOn(AndroidScheculers.mainThread())
的指定,因此subscriber
数字的打印将发生在主线程 。事实上,这种在subscribe()
之前写上两句ubscribeOn(Scheduler.io())
和 observeOn(AndroidSchedulers.mainThread())
的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
那么,加载图片将会发生在IO
线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。
变换
- 变换是最复杂的,也是最难理解的。暂且先说怎么使用,代码如下:
可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
从上面的代码可以看出,
flatMap()
和map()
有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和map()
不同的是,flatMap()
中返回的是个Observable
对象,并且这个Observable
对象并不是被直接发送到了Subscriber
的回调方法中。flatMap()
的原理是这样的:
- 使用传入的事件对象创建一个
Observable
对象;- 并不发送这个
Observable,
而是将它激活,于是它开始发送事件;- 每一个创建出来的
Observable
发送的事件,都被汇入同一个Observable
,而这个Observable
负责将这些事件统一交给Subscriber
的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的Observable
将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是flatMap()
所谓的flat
。