1、什么是观察者模式:
观察者模式是基于Subject这个概念的,Subject是一种特殊对象,又叫主题或者被观察者。当改变时,那些由他保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。
2、扩展的观察者模式
在Rxjava中主要有4个角色
1、Observable
2、Subject
3、Observer
4、Subscriber
Observable和Subject是两个“生产实体”,Observer和Subscriber是两个“消费实体”。直白的说Observable对应于观察者模式中的被观察者,而Observer和Subscriber对应观察者模式中的观察者。而Subscriber其实是一个实现了Observer的抽象类。Subject比较复杂。
Rxjava如何使用的
第一步:创建观察者Observer
Observer observer = new Observer() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
};
开篇我们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。ps:onNext就相当于普通观察者模式中的update
RxJava中添加了普通观察者模式缺失的三个功能:
RxJava中规定当不再有新的事件发出时,可以调用onCompleted()方法作为标示;
当事件处理出现异常时框架自动触发onError()方法;
同时Observables支持链式调用,从而避免了回调嵌套的问题。
第二步:创建被观察者Observable
Observable.create()方法可以创建一个Observable,使用crate()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。
Observable observable1 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Object o) {
}
});
除了create能创建Observable,还有just和from也能创建
just(T)方式
/***第二种:创建被1观察者**/
Observable observable2 = Observable.just("one", "two", "three", "foth");
//上面代码会调用
//onNext("one");
//onNext("two");
//onNext("three");
//onNext("foth");
//onCompleted();
from(T[])/form(Iterable<? extends T>)
/***第三种创建被观察者*/
String[] array = {"one", "two", "three", "foth"};
Observable observable3 = Observable.from(array);
//onNext("one");
//onNext("two");
//onNext("three");
//onNext("foth");
//onCompleted();
第三部:被观察者Observable订阅观察者Observer
这里就不同于普通的观察者模式,这里是被观察者订阅观察者
Observable.subscribe(obserber);
连接起来写就是这样的
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i = 0; i < 5; i++){
subscriber.onNext(i);
}
}
}).subscribe(new Observer<Integer>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Toast.makeText(MainActivity.this, "create"+integer, Toast.LENGTH_SHORT).show();
}
});
上述代码一旦调用subscribe()订阅后就会自动触发call方法,call()中的Subscriber其实就是一个观察者observer,subscriber是实现了Observer的接口。
在看subscribe()这个方法内部将传进来的Observer做了一层代理,将他转换成Subscriber。看看源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
返回的是observable.onSubscribe,正是create()方法中的Subscribe。
subscribe()的参数除了可以是Observer和Subscriber以外还可以是Action1,Action0,这是一种简单的回调,只有一个call(T)方法。
Observable.just("1111", "2222")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Toast.makeText(MainActivity.this, "just print log"+s, Toast.LENGTH_SHORT).show();
}
});
Observable.from(str)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Toast.makeText(MainActivity.this, "just print log", Toast.LENGTH_SHORT).show();
}
});
Schedulers线程的切换
合起来的链式操作就是这样:
Observable.from(str)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Toast.makeText(MainActivity.this, "just print log", Toast.LENGTH_SHORT).show();
}
});