概述
rxjava2想必大家都用的很熟练了,但我们大多数工程师又有多少是从源码来深入了解它呢,尤其是在找工作面试中,想必大家深有体会,知道怎么用,却不不知道怎么说。总的来说就是对原理不了解。
也许你由于工作原因没时间,也可能是其他原因,总之没了解,没关系我也没怎么了解。由于打算回家发展,辞掉了现在的工作,自己也很想写一篇关于rxjava2的文章,也不用再去写业务方面的代码,想想现在的生活状态,真的很nice。
由于Android studio默认没有rxjava2的相关api,所以第一步就是在github中找到rxjava2的库,当我在查找的时候已经出来了rxjava3了,有需要了解的可以 点击了解。这里我们只是针对rxjava2,rxjava3可能有一些新的api或什么的 但大多api还是通用的这里就不过多的阐述。
在日常的工作中rxjava使用的最多的就是在网络请求中,但如果你对rxjava运用的比较熟悉的话,它干的事情也是很多的,比如切线程,类型的转换,举个简单例子,比如输入的int类型的数据,通过rxjava可以转换成String类型。一个实体类转换成另外一个实体类。或者修改类中的成员变量,或者遍历集合等等都可以用rxjava来实现。其实这个库功能是相当强大的。我们也不要局限在只是使用在网络框架请求上。
网络框架使用的三方库依赖
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
implementation "com.squareup.retrofit2:retrofit:2.5.0"
implementation "com.squareup.retrofit2:converter-gson:2.4.0"
implementation "com.squareup.retrofit2:adapter-rxjava2:2.4.0"
implementation "me.jessyan:retrofit-url-manager:1.4.0"
implementation "io.reactivex.rxjava2:rxkotlin:2.2.0"
implementation "io.reactivex.rxjava2:rxandroid:2.1.0"
implementation "com.squareup.okhttp3:okhttp:3.14.1"
implementation "com.google.code.gson:gson:2.8.5"
implementation "com.squareup.okhttp3:logging-interceptor:3.11.0"
注意:添加依赖后需要注明依赖来源 maven { url "https://jitpack.io" }
点击右上方的同步按钮,依赖就算配置好了。
讲解rxjava之前,先简单的搭建下网络请求的相关配置,我这里以github 为例
比如使用users/{user}/repos网络请求api
public interface Api {
@GET("users/{user}/repos")
Single<List<Repo>> listRepos(@Path("user") String user);
}
这里讲解下Single ,我这里使用的是Single,为什么使用它而没有使用Observable或者是Flowable,我个人觉得因为Single请求返回的只有成功或者失败两种状态的返回方法,与网络请求失败或成功一致,因此我这里选择了Single而不是其他两种,也有大佬说在网络请求中使用Single比较好一点这也是选择它的原因之一,在实际项目中我这里的也是选择Single的。
Repo实体类的内容比较多我这里就不贴了,主要还是看下网络请求OkhttpClient和Retrofit的相关代码
主要的地方我讲下 其他的最后我会把代码上传到github,大家可以到github上看下就可以了
subscribeOn和observeOn
相信很多开发工作者对这两种该如何使用有点犯晕,不知道这两种该如何选择使用,甚至在代码中乱用的现象,有时候我看我同事使用的时候也会出现这种的现象。如何使用呢?其实也是很简单的。首先它们都是切换线程用的,它们都可以切换到主线程或者子线程。比较通俗的讲subscribeOn是对上游进行切换线程使用的,observeOn是对下游线程进行切换使用的。什么是上游什么又是下游呢?其实也是很简单的,我们都知道rxjava是链式调用的,这里的上游就是以你切换线程的地方作为分界点,上方就称为上游,反之为下游。比如上游需要进行耗时io操作。那么我们就可以使用subscribeOn(Schedulers.io()) ,当然如果你了解了源码,你会发现Schedulers可以选择的线程方式开始有很多种的,比如说newThread computation或者single等等。默认的使用的是computation
computation
用于CPU密集型的计算任务,不适合I/O操作
newThread
为每个任务创建一个新的线程。
single
single拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有
任务执行时,它的任务将会按照先进先出的顺序依次执行。
先来个简单的例子
Single<String> stringSingle = Single.just("1");
stringSingle
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
mTextView.setText(s);
}
@Override
public void onError(Throwable e) {
}
});
先讲解上面的代码,首先创建一个单一的事件流,通过订阅subscribe发送事件流,把数据发送到onSuccess
到这里只是简单的描述了下整个事件流的事件执行过程,现在从源码来了解下它是如何返回的single对象。
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
方法块的第一行判断是否为null,第二行onAssembly这个方法其实是一个钩子,可以点进去看下,代码如下:
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onSingleAssembly是一个全局变量,它是对你的single进行一个统一的处理,比如增加日志,但我们这里是不需要使用到它的。也就是直接返回source而没有执行!=null判断条件下的代码块。这里可以忽略它。
从上面的代码可以分析到,其实这里只是创建了一个SingleJust就返回了
我们在点击SingleJust看下它的源码长什么样
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
这段代码其实也很好理解,就是把发送的1这个值存储起来,subscribeActual这个方法,稍后再讲解下,接下来我们看下订阅事件的subscribe
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
方法块的第一行判断null就不说了,第二行其实也是一个hook(钩子)方法。点击去看,你会发现跟上面的钩子方法是很像的。这个hook在默认的情况下也是没有用到的。这个方法最核心的代码 subscribeActual(observer),我们点进去看下,会发现这行代码是一个抽象方法。它是在哪里执行的呢。其实就是上面讲解的SingleJust中还没有讲解到的那个方法。也就是这段代码,如下:
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
这就把创建的被观察者和观察者联系起来了。
整个流程图
有时候我们在开发的时候也会遇到这样一种场景,就是当前的activity不存在的情况,代码正在执行onSuccess方法,如果是在进行UI操作,由于找不到对应的ui控件而直接crash的情况。那么我们该怎么做呢?这里就要用到Disposables。在当前activity被onDestory之前阻断事件流。
在讲解Disposable原理前,我们对被观察者分为后续延迟性和非后续非延迟性进行分类。比如上面提到的Single.just就属于非后续非延迟性被观察者,Observable.just就属于后续延迟性被观察者。
这里给个简单的例子来分析下Observable的后续和延迟性
Observable
.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
}
});
通过入口interval找到IntervalObserver,先来分析下这个类
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
它继承至AtomicReference<Disposable>,AtomicReference可以保证原子操作的Disposable类对象的引用。实现了Disposable接口,这里我们就能够知道它内部的操作是可以被打断的,但实际真正打断的却是AtomicReference类中的打断方法,也就是DisposableHelper.dispose(this);点进去可以发现他就是通过真正可以取消的Dispose去执行取消的操作。那么它真正的取消的对象又是在哪呢?其实就是上面的 DisposableHelper.setOnce(this, d),而它是通过执行setResource方法,而这个方法又是在执行定时任务可取消的 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit)得到的,整个过程就算理顺了,同时他还实现了Runnable,这就保证了被观察者具有延迟和持续性的条件,这里也可以看出它是在子线程中进行计时操作的。到最后在进行ui操作是需要切换到UI线程执行的,那么这里就可以理解为它是通过子线程切换到主线程然后循环往复操作的。它的作用就是让下游处理的数据比上游晚,这样就可以持续和数据的稳定。
我们都知道rxjava有很多的操作符,接下来讲解下rxjava比较常见的操作符。
Map操作符
map字面意思是映射,通过映射操作符返回的结果通过map做下转换。比如说事件源是一个int类型我们希望转换成String类型,这个我们就可以用map操作符来实现,比如:
Single<Integer> stringSingle = Single.just(1);
stringSingle
.map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
return String.valueOf(s);
}
})
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
mTextView.setText(s);
}
@Override
public void onError(Throwable e) {
}
});
接着从源码解读map是如何实现类型转换的
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
onAssembly上面已经说过了,这里我们直接分析下SingleMap
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
对source和mapper两个参数直接赋值,source指的是事件来源,来源指的Single<Integer>,mapper指的Function,它相当于一个转换器。
接着看下面的代码
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
了解到map把最上层single包裹在内部,通过内部的上层的Single来订阅事件并向下传递事件。这里的范型t就是我们最外层看到的new Function<Integer, String>()
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
关键代码 v = ObjectHelper.requireNonNull(mapper.apply(value)
它除了判断mapper.apply(value)是否为空外还对其进行了赋值操作。返回的R指的是转换过后的值。
通过源码能够了解到,从最外层链式调用让我们直接想到构建模式,但从源码可以看到内部都是通过new 新创建的
执行流程如下
rxjava2中的操作符其实大部分都是这样的。讲到这大部分都讲完了,如果读者觉得还有些没讲到的可以在评论区补充,讲的不到位的或者有误的还望订正
为了让大家更好的理解,个人觉得有些从网上找来图片能够更好的解释一些问题,如有侵权,请联系我,我会第一时间删掉。后续还会讲到retrofit源码解析以及自定义view的使用及实战都会在这个项目中使用
代码传送门
👏👏👏创作不易,你的鼓励就是我最大的创作动力 欢迎star 点赞👍