使用
导包
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0-RC8'
基本使用
Observable.create(ObservableOnSubscribe<Int> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
emitter.onNext(4)
}).subscribe(object : Observer<Int> {
override fun onComplete() {
Log.i(TAG,"onComplete")
}
override fun onSubscribe(d: Disposable) {
// print > onSubscribe: io.reactivex.rxjava3.internal.operators.observable.ObservableCreate$CreateEmitter
Log.i(TAG,"onSubscribe: "+d.javaClass.name)
}
override fun onNext(t: Int) {
// print > 1 2 3
Log.i(TAG,"onNext: $t")
}
override fun onError(e: Throwable?) {
Log.i(TAG,"onError")
}
})
配合Retrofit使用
interface IFreeService {
@GET("api/personalMessage/get/{id}")
fun updatePersonalMessage(
@Path("id") personalId: String
): Single<List<Repo>>
}
val retrofit = Retrofit.Builder()
.baseUrl(baseUrl)
.addConverterFactory(GsonConverterFactory.create())
.build()
val api = retrofit.create(IFreeService::class.java)
api.updatePersonalMessage(personalId = "1")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : SingleObserver<List<Repo>>{
override fun onSuccess(t: List<Repo>?) {
}
// 刚刚订阅就会得到回调,做初始化工作,在网络请求之前调用
override fun onSubscribe(d: Disposable?) {
}
override fun onError(e: Throwable?) {
}
})
注意页面关闭需要解绑,避免内存泄漏。
Observable源码解析
Observable#subscribe
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 关键代码,执行了被观察者的subscribeActual方法
subscribeActual(observer);
}catch (Throwable e) {
...
}
}
被观察者通过create创建ObservableCreate对象。
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建发射器,并且把下游的observer给发射器
CreateEmitter<T> parent = new CreateEmitter<>(observer);
// 下游接收到被订阅回调
observer.onSubscribe(parent);
try {
//上游订阅,订阅创建的发射器
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这个就相当于在原有被观察者,观察者模型间插了一层CreateEmitter,上游发送数据给CreateEmitter,加工后再给到下游。
CreateEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
总结:
- 接收到上游发送数据,如果没有解绑会直接转发数据到下游。
- onComplete和onError都会调用dispose方法,再调用onNext,下游也就接收不到数据了。
取消订阅
上游停止生产,上游不再向下游传递数据。
讲解:Observable的delay操作符
从上面Observable源码解析可以得到delay方法,生成了ObservableDelay对象,链式调用会调用它的subscribeActual方法。
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super T> t) {
Observer<T> observer;
if (delayError) {
observer = (Observer<T>)t;
} else {
observer = new SerializedObserver<>(t);
}
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
}
DelayObserver
static final class DelayObserver<T> implements Observer<T>, Disposable {
Disposable upstream;
@Override
public void onSubscribe(Disposable d) {
// 握有上游的Disposable对象,下游绑定的DelayObserver这个Disposable对象
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(final T t) {
w.schedule(new OnNext(t), delay, unit);
}
@Override
public void onError(final Throwable t) {
w.schedule(new OnError(t), delayError ? delay : 0, unit);
}
@Override
public void onComplete() {
w.schedule(new OnComplete(), delay, unit);
}
@Override
public void dispose() {
// 取消上游任务
upstream.dispose();
// 取消延时发送任务
w.dispose();
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
final class OnNext implements Runnable {
private final T t;
OnNext(T t) {
this.t = t;
}
@Override
public void run() {
downstream.onNext(t);
}
}
final class OnError implements Runnable {
private final Throwable throwable;
OnError(Throwable throwable) {
this.throwable = throwable;
}
@Override
public void run() {
try {
downstream.onError(throwable);
} finally {
w.dispose();
}
}
}
final class OnComplete implements Runnable {
@Override
public void run() {
try {
downstream.onComplete();
} finally {
w.dispose();
}
}
}
}