关于RxJava具体是什么我就不多说了,这一系列文章我只打算对RxJava2.x的用法进行一些总结。
使用RxJava有以下3个步骤
- 创建一个Observable
- 创建一个Observer
- 使用subscribe关联Observable和Observer
接下来就来说明他们分别是什么
Observable
被观察者,被观察者会通知观察者进行相应的操作。
Observer
观察者,真正执行动作的是观察者,当观察到被观察者发出相应的信号时,观察者就会执行自身的逻辑。
Observable.just("Hello", "World")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
例如上面这段代码,被观察者Observable通过just操作符发出2个字符串,而Consumer是个观察者,在accept方法内打印出Observable发送的字符串。
subscribe有多个重载方法
subscribe(onNext);
subscribe(onNext,onError);
subscribe(onNext,onError,onComplete);
subscribe(onNext,onError,onComplete,onSubscribe);
1.onNext:Consumer类型
2.onError:Consumer类型
3.onComplete:Action类型
4.onSubscribe:Consumer类型
Consumer是单参数,而Action是无参数
Observable.just("Hello", "World")
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("doOnNext");
}
})
.doAfterNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("doAfterNext");
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnComplete");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("doOnSubscribe");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
System.out.println("doAfterTerminate");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println("doFinally");
}
})
.doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> stringNotification) throws Exception {
System.out.println("doOnEach");
}
})
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("doOnLifecycle");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnLifecycle Action");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext --------->"+s);
}
});
I: doOnSubscribe
I: doOnLifecycle
I: doOnNext
I: doOnEach
I: onNext --------->Hello
I: doAfterNext
I: doOnNext
I: doOnEach
I: onNext --------->World
I: doAfterNext
I: doOnComplete
I: doOnEach
I: doFinally
I: doAfterTerminate
他们之间的调用顺序如上所示。
在RxJava2.x中有5种观察者模式
1.Observable和Observer
2.Flowable和Subscriber
3.Single和SingleObserver
4.Completable和CompletableObserver
5.Maybe和MaybeOberver
Observable和Observer能够发射0个或n个数据,以错误或成功事件终止。
Flowable和Subscriber能够发射0个或n个数据,以错误或成功事件终止,支持背压策略控制发射速度
Single和SingleObserver发送单个数据或错误事件
Completable和CompletableObserver不发射数据只能处理onComplete和onError
Maybe和MaybeOberver能够发射0个或1个数据,要么成功要么失败。
Hot Observable与Cold Observable
通过publish操作符把普通的Cold Observable转换为Hot Observable(ConnectableObservable)
如果不调用connect方法则不会开始发送事件
通过refCount操作符把Hot Observable转换为Cold Observable,跟踪所有的Observer,只有全部都取消的订阅,才会断开与Observable的链接。
share操作符则是publish().refCount的链式调用
Subject
既是Observable也是Observer
Completable
只有onComplete和onError事件,并且经常和andThen一起使用
Single
只能发送一个事件和接受一个事件
只有onSuccess和onError事件
Maybe
相当于Single和Completable的结合体
有onSuccess,onComplete,onError三个事件
Subject
- AsyncSubject 只发送最后一个
- BehaviorSubject 只发送订阅前一个和订阅后的所有
- ReplaySubject 发送全部
- PublishSubject 发送订阅后的全部
Processor
带背压的Subject