Rxlifecycle (https://github.com/trello/RxLifecycle)是基于RxJava对Android生命周期管理的一个库。
gradle引用库,components包下结构
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'
支持Activity、Fragment以及期子类和support包下分别对应,RxFragment和RxActivity内部原理一样只是多了几个Fragment生命周期的方法,选取RxActivity进行查看源码。
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
public final Observable<ActivityEvent> lifecycle() {
return lifecycleSubject.hide();
}
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleSubject.onNext(ActivityEvent.CREATE);
}
protected void onStart() {
super.onStart();
lifecycleSubject.onNext(ActivityEvent.START);
}
protected void onResume() {
super.onResume();
lifecycleSubject.onNext(ActivityEvent.RESUME);
}
protected void onPause() {
lifecycleSubject.onNext(ActivityEvent.PAUSE);
super.onPause();
}
protected void onStop() {
lifecycleSubject.onNext(ActivityEvent.STOP);
super.onStop();
}
protected void onDestroy() {
lifecycleSubject.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
RxActivity中一个BehaviorSubject对象,三个公共方法,还有根据Activity生命周期发射ActivityEvent事件。
首先分析BehaviorSubject有何作用
public final class BehaviorSubject<T> extends Subject<T> {
...
}
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
...
}
BehaviorSubject继承Subject,Subject集成Observable实现Observer,Subject的官方介绍:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。也就是Subject既可以作为被观察者,也可以作为观察者。
BehaviorSubject
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
public void testBehavior() {
final BehaviorSubject<Integer> lifecycleSubject = BehaviorSubject.createDefault(99);
lifecycleSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept()--" + integer);//此时还没有收到任何数据,所以收到默认值99
//然后会受到后续发射的所有值0,1,2,3
}
});
lifecycleSubject.onNext(0);
lifecycleSubject.onNext(1);
lifecycleSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("after 1 accept()--" + integer);//只能收到最近发射的数据1
//然后会受到后续发射的所有值2,3
}
});
lifecycleSubject.onNext(2);
lifecycleSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("after 2 accept()--" + integer);//只能收到最近发射的数据2
//然后会受到后续发射的所有值3
}
});
lifecycleSubject.onNext(3);
}
RxActivity 中lifecycleSubject 在Activity生命周期的每个方法都发送了ActivityEvent数据,此时订阅lifecycleSubject 就可以收到Activity周期方法的回调
查看RxActivity中的三个方法,Observable<ActivityEvent> lifecycle(),LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event),LifecycleTransformer<T> bindToLifecycle()
首先来看Observable<ActivityEvent> lifecycle()方法
public final Observable<ActivityEvent> lifecycle() {
return lifecycleSubject.hide();
}
Observable<T> hide()方法官方解释隐藏这个Observable的身份及其一次性。即lifecycle()方法返回一个Observable对象然后来看LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event)方法
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
//调用RxLifecycle.bindUntilEvent(lifecycleSubject, event)方法
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle,
@Nonnull final R event) {
//先调用takeUntilEvent(lifecycle, event)
//然后调用bind(@Nonnull final Observable<R> lifecycle)
return bind(takeUntilEvent(lifecycle, event));
}
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
//filter操作符根据ActivityEvent 进行过滤,在指定的ActivityEvent 事件时才往下游执行
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
//把Observable转为LifecycleTransformer,返回LifecycleTransformer对象
return new LifecycleTransformer<>(lifecycle);
}
bindUntilEvent(@NonNull ActivityEvent event) 最终返回一个根据ActivityEvent 过滤的Observable转换为LifecycleTransformer对象,LifecycleTransformer实现了ObservableTransformer。
bindUntilEvent常用于网络请求,然后监听Activity onDestroy()时,结束网络请求,避免Activity销毁后网络请求返回成功刷新界面View空指针。模拟网络请求代码,10秒后返回数据
BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
Observable.timer(10, TimeUnit.SECONDS)
.compose(RxLifecycle.bindUntilEvent(lifecycleSubject, ActivityEvent.DESTROY))
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
//返回网络请求数据
}
});
lifecycleSubject.onNext(ActivityEvent.DESTROY);
然后lifecycleSubject发射一个ActivityEvent.DESTROY事件,然后网络回调事件就不会在执行下去。那么为什么compose(LifecycleTransformer)后就能停止事件不在执行呢?首先看一下compose操作符
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
传入一个ObservableTransformer,然后调用ObservableTransformer.apply(Observable)方法。然后调用wrap(ObservableSource<T> source)方法,返回Observable对象
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}
wrap方法最终返回真实ObservableFromUnsafeSource对象
此时来分析观察者对象Observer事件为何不会执行下去。订阅事件最终会调用subscribeActual(Observer)抽象方法,此方法的实现在真实Observable类型中。从wrap方法得知真实的Observable即为ObservableFromUnsafeSource。
public final class ObservableFromUnsafeSource<T> extends Observable<T> {
final ObservableSource<T> source;
public ObservableFromUnsafeSource(ObservableSource<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(observer);
}
}
ObservableFromUnsafeSource中的subscribeActual方法,又调用了ObservableSource.subscribe方法,那么ObservableSource对象又是什么呢?再来看wrap方法传入的参数即ObservableTransformer.apply(Observable)方法返回的对象。此ObservableTransformer对象即LifecycleTransformer对象,那么我们看一下LifecycleTransformer源码。
//精简之后的代码
public final class LifecycleTransformer<T> implements ObservableTransformer<T, T> {
final Observable<?> observable;
LifecycleTransformer(Observable<?> observable) {
checkNotNull(observable, "observable == null");
this.observable = observable;
}
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(observable);
}
}
这里边有两个Observable对象,回想一下ObservableTransformer.apply(Observable)中Observable传入的是this,即最原始的被观察者对象,也就是Observable<T> upstream,Observable<?> observable对象就是BehaviorSubject<ActivityEvent>。此时监听Activity生命周期BehaviorSubject对象就和原始被观察者对象(即网络请求Observable)就被关联在一起了,然后用原始被观察者对象调用takeUntil()方法,传入的对象为BehaviorSubject。我们来看一下takeUntil()操作的官方解释
TakeUntil是使用一个标志Observable是否发射数据来判断,当标志Observable没有发射数据时,正常发射数据,而一旦标志Observable发射过了数据则后面的数据都会被丢弃。
此时分析,网络请求Observable对象使用TakeUntil操作符关联BehaviorSubject对象,BehaviorSubject对象发射了数据,然后网络请求Observable则丢失数据,即不在调用观察者Observer,BehaviorSubject过滤了ActivityEvent事件,只有事件为ActivityEvent.DESTROY时,BehaviorSubject才发射,所以当Activity关闭时,BehaviorSubject发射ActivityEvent.DESTROY事件,网络请求Observable丢失数据,不在调用观察者Observer,整个网络请求和Activity生命周期绑定完成。
最后来看RxActivity最后一个LifecycleTransformer<T> bindToLifecycle()方法
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
return bind(lifecycle, ACTIVITY_LIFECYCLE);
}
然后看一下ACTIVITY_LIFECYCLE
private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
new Function<ActivityEvent, ActivityEvent>() {
@Override
public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
switch (lastEvent) {
case CREATE:
return ActivityEvent.DESTROY;
case START:
return ActivityEvent.STOP;
case RESUME:
return ActivityEvent.PAUSE;
case PAUSE:
return ActivityEvent.STOP;
case STOP:
return ActivityEvent.DESTROY;
case DESTROY:
throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
default:
throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
}
}
};
Function只是一个接口,调用apply(T)方法,返回泛型R,是一个类型转换的方法
public interface Function<T, R> {
@NonNull
R apply(@NonNull T t) throws Exception;
}
ACTIVITY_LIFECYCLE 中根据Activity生命周期一一对应做了ActivityEvent 转换。
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
我们来看map操作符,最终调用MapObserver
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v;
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
actual.onNext(v);
}
}
其中mapper为Function对象,onNext方法中调用mapper.apply(t),做了一个转换,即上面的ACTIVITY_LIFECYCLE ,ActivityEvent.CREATE和ActivityEvent.DESTROY的一个转换。
take()只发射前面的N项数据,忽略剩余的数据,take(1)即只发射前面一项数据
skip()抑制Observable发射的前N项数据,只保留之后的数据,skip(1)即忽略发射的前1项数据
combineLatest()操作符在原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
最终调用 bind(@Nonnull final Observable<R> lifecycle)方法
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
return new LifecycleTransformer<>(lifecycle);
}
LifecycleTransformer<T> bindToLifecycle()方法作用和LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event)方法是一样的,只是根据ActivityEvent事件做了一个转换。
加入网络请求在onStart调用,会在onStop停止。