简述:
a library for composing asynchronous and event-based programs using obaservable for the
Java VM (一个对于构成使用的Java虚拟机观察序列号异步和基于事件的程序库)。
总结:随着程序逻辑变得越来越复杂,它依然能够保持简洁。
RxJava引入的目的:异步,代码更清晰
优点:采用观察者模式链式调用,简洁明了,以往实现子线程主线程切换需自己手动new Thread(推送线程池),
并且线程之间还需要使用handler进行通信,Rxjava一步到位,极其简单。
1.基础概念:
Observable(əbˈzɜ:vəbl):在观察者模式中称为“被观察者”
Observer(əbˈzɜ:və(r)):观察者模式中的“观察者”,可接收Observeable发送的数据
subscribe(səbˈskraɪb):订阅,观察者与被观察者,通过subscribe()方法进行订阅
subscriber(səbˈskraɪbə(r)):也是一种观察者,在2.0中它与Observer没什么实质的区别,不同的是Subscriber与Flowable联合使用
Flowable(f'laʊəbl):也是悲观餐者的一种,与Subscriber进行配合使用,实现背压操作
RxJava的异步实现方式:
让Observable开启子线程执行耗时操作,完成耗时操作后,触发回调,通知Observer进行主线程UI更新。
如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。
RxJava中默认Observer和Observable都在同一线程执行任务。
2.Rxjava常用操作符
from()操作符:
接受数组或集合,返回一个按参数列表顺序发射这些数据的Observable。
源码:
public final static <T> Observable<T> from(Iterable<? extents T> iterable){
return create(new OnSubscribeFromIterable<T>(iterable));
}
例如:
String[] array = {"Amy","Rookie","MLXG"};
Observable.from(array)
.subscribe(new Observer<String>(){
...
});
just()操作符:
接受1-9个参数,它们还可以是不同类型,返回一个按参数列表顺序发射这些数据的Observable。
例如:
Observable.just(1,2.4,"adb")
.subscribe(new Action1<String>(){
...
});
map()操作符:
把原来的Observable对象转换成另一个Observable对象,方便Observer获得想要的数据形式,一对一
列如:
Observable.just("images/logo.png") //输入类型 String
.map(new Func1<String,Bitmap>(){
@Verride
public Bitmap call(String filePath){ //参数类型 String
return getBitmapFromPath(filePath); //返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>(){
@Override
public void call(Bitmap bitmap){ //参数类型 bitmap
showBitmap(bitmap)
}
});
flatMap()操作符:
返回任何它想返回的Observable对象,一对多
列如:
Student[] students = ...?;
Subscriber<Course> subscriber = new Subscriber<Course>(){
@Override
public void onNext(Course course){
...
}
};
Observable.from(students)
.flatMap(new Func1<Student,Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
filter()操作符:
Func中对每项元素进行过滤处理,满足条件的元素才会继续发送,下面的过滤偶数。
列如:
Observable.just(2,3,23,54,15)
.filter(new Func1<Integer,Boolean>() {
@Override
public Boolean call(Integer integer){
return integer % 2 == 0;
}
})
.subscribe(new Observer<Integer>(){
@Override
public void onNext(Integer integer){
...
}
...
});
take()操作符:
输出最多指定数量的结果
列如:
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.take(3) //只发送前三个事件
...
doOnNext()操作符:
用来在观察者Observer.onNext()方法调用之前进行一些初始化操作,保存/缓存网络结果
例如:
Observable.just(1,2,3)
.doOnNext(new Action1<Integer>(){
@Override
public void call(Integer integer){
...
}
})
...
Merge()操作符:
合并多个Observable,按照加入的Observable顺序将各个元素传递。
例如:
Observable<Integer> obserable1 = Observable.just(2,12,34,32);
Observable<Integer> obserable2 = Observable.just(32,12,43,2);
Observable.merge(observable1,observable2)
.subscribe(new Observer<Integer>(){
...
});
zip()操作符:
将各个Observable个对应位置各个元素取出做操作,然后将结果传递。
例如:
Observable<Integer> observable1 = Observable.just(1,2,3);
Observable<Integer> observable2 = Observable.just(11,22);
Observable.zip(observable1,observable2,newFunc2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer integer1,Integer integer){
return integer1+integer2;
}
})
.subscribe(new Observable<Integer>(){
...
@Override
public void onNext(Integer integer){
//out 12、24、3
}
});
3.Scheduler(调度号)切换线程
Schedulers.immediate():
直接在当前线程运行,相当于不指定线程,默认
Schedulers.newThread():
总是启动新线程,并在新线程操作
Schedulers.io():
用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;
对于普通的计算任务,请使用Schedulers.computation();
Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。
Schedulers.computation():
用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.trampoline():
当其它排队的任务完成后,在当前线程排队开始执行。
SubscribeOn\ObserveOn
subscribeOn():
指定Observable(被观察者)所在的线程,或叫做事件产生的线程。
observeOn():
指定Observer(观察者)所运行在的线程,或叫做事件消费的线程。
4.Fowable与Subscriber
当被观察者发射数据的速度大于观察者接收处理数据的速度,造成观察者的调度器中数据缓冲池无限堆积,
超出了缓冲池的最大容量,导致OOM.
例如:
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception{
int a = 0;
while(true){
e.onNext("data:"+(i++));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>(){
@Override
public void accept(String s) throws Exception{
Thread.sleep(2000);
print(s);
}
});
而此时Flowable的背压策略就很好的解决这个问题.
例如:
Flowable.create(new FlowableOnSubscribe<String>(){
@Override
public void subcribe(@NonNull FlowableEmitter<String> e) throws Exception{
int i = 0;
while(true){
e.onNext("data:"+(i++));
}
}
},BackpressureStrategy.DROP) //超出缓冲池的数据丢弃
.subecribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>(){
Subscription subscription;
@Override
public void onSubsrcibe(Subscription s){
subscription = s;
subscription.request(1);
}
@Override
public void onNext(String s){
try{
Thread.sleep(2000);
}catch(InterruptedException e){
e.printStackTrace();
}
printThred(2);
subscription.request(1); //处理完了,再请求数据
}
...
});
该背压策略是超出缓冲池的数据被丢弃,而观察者要求处理一个 发送我一个数据。
Backpressure的策略
a.被观察者和观察者在异步线程的情况下,如果被观察者发射事件的速度大于
观察者接收事件的速度,就会产生Backpressure问题。
但是同步情况下,Backpressure问题不会存在。
b.Backpressure的策略仅仅是调度Subscriber接收事件,并不影响Flowable
发送事件。观察者可以根据自身实际情况按需拉取数据,而不是被动接收。
最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。
c.Backpressure的策略有5种:ERROR,BUFFER,DROP,LATEST,MISSING
ERROR:
用来缓存观察者处理不了暂时缓存下来的数据,缓冲池的默认大小为128,即只能缓存128个事件。
如果缓冲池溢出,就会立刻抛出MissingBackpressureException异常。
BUFFER:
即把默认容器为128的缓存池成一个大的缓存池,支持很多的数据,这种方式
比较消耗内存。
DROP:
当消费者处理不了的时候就丢弃,消费者通过request传入其需求n(事件个数),
然后生产着把n个事件传递给消费者供其消费,其他消费不掉的丢弃。
LATEST:
基本和DROP一致,消费者通过request传入需求n,然后生产者把n个事
件传递给消费者供其消费,其他消费不掉的事件就丢弃。
唯一区别是LATEST总能使消费者能够接收到生产者产生的最好一个事件。
MISSING:
没有缓冲池,接收第一个数据之后,后面的都丢弃。