Android RxLife 一款轻量级别的RxJava生命周期管理库(一)

简介

RxLife是一款轻量级别的RxJava生命周期管理库,代码侵入性极低,随用随取,不需要做任何准备工作,支持在Activity/Fragment 的任意生命周期方法断开管道。

原理

RxLife通过Jetpack 下的 Lifecycle 获取 Activity/Fragment 的生命周期变化,并通过Observable.lift(ObservableOperator) 操作符,注入自己实现的Observer对象(该对象能感知 Activity/Fragment的生命周期变化),从而在onSubscribe(Disposable d)方法中拿到Disposable对象,随后在相应的生命周期回调里执行Disposable.dispose()方法断开管道,这样就能将lift操作符上面的所有Disposable对象全部断开。

为什么要重复造轮子

熟悉RxJava的同学应该都知道trello/RxLifecycle 项目,它在目前的3.0.0版本中通过Lifecycle感知Activity/Fragment 的生命周期变化,并通过BehaviorSubject类及composetakeUntil操作符来实现管道的中断,这种实现原理有一点不足的是,它在管道断开后,始终会往下游发送一个onComplete事件,这对于在onComplete事件中有业务逻辑的同学来说,无疑是致命的。那为什么会这样呢?因为takeUntil操作符内部实现机制就是这样的,有兴趣的同学可以去阅读takeUntil操作符的源码,这里不展开。而RxLife就不会有这样问题,因为在原理上RxLife就与trello/RxLifecycle不同,并且RxLife还在lift操作都的基础上提供了一些额外的api,能有效的避免因RxJava内部类持有Activity/Fragment的引用,而造成的内存泄漏问题,下面开始讲解。

gradle依赖


implementation 'com.rxjava.rxlife:rxlife:1.0.4'

源码下载

用法


Observable.timer(10, TimeUnit.SECONDS)

        //默认在onDestroy时中断管道

        .lift(RxLife.lift(this))

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

//或者

Observable.timer(10, TimeUnit.SECONDS)

        //指定在onStop时中断管道

        .lift(RxLife.lift(this,Event.ON_STOP))

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

在Activity/Fragment 中,使用Observable的lift()操作符,方法中传入RxLife.lift(this),如果需要指定生命周期方法,额外再传一个Event对象即可。怎么样??是不是极其简单,根本不需要做任何准备工作,代码侵入性极低。

处理内存泄漏

我们来看一个案例


public void leakcanary(View view) {

    Observable.timer(100, TimeUnit.MILLISECONDS)

            .map(new MyFunction<>()) //阻塞操作

            .lift(RxLife.lift(this))

            .subscribe(new Consumer<Long>() { //这里使用匿名内部类,持有Activity的引用

                //注意这里不能使用Lambda表达式,否则leakcanary检测不到内存泄漏

                @Override

                public void accept(Long aLong) throws Exception {

                    Log.e("LJX", "accept =" + aLong);

                }

            });

}

//这里使用静态内部类,不会持有外部类的引用

static class MyFunction<T> implements Function<T, T> {

    @Override

    public T apply(T t) throws Exception {

        //当dispose时,第一次睡眠会被吵醒,接着便会进入第二次睡眠

        try {

            Thread.sleep(3000);

        } catch (Exception e) {

        }

        try {

            Thread.sleep(30000);

        } catch (Exception e) {

        }

        return t;

    }

}

上面的代码会造成Activity无法回收,导致内存泄漏,我们用Leakcannry工具来检测一下,发现确实造成来内存泄漏,如下

在这里插入图片描述

我们已经使用RxLife库,会自动中断管道,那为什么还会造成内存泄漏呢?其实原因很简单,我们只是中断了管道,而没有中断上游对下游引用。看上面的截图就能知道,上游始终持有下游的引用,而最下游的匿名内部类Consumer又持有了Activity的引用,所以就导致了Activity无法回收。

那为什么中断管道时,不会中断上下游的引用呢?

首先有一点我们需要明确,调用Disposable.dispose()方法来断开管道,并不是真正意义上的将上游与下游断开,它只是改变了管道上各个Observer对象的一个标志位的值,我们来看一下LambdaObserver类的源码就会知道


@Override

    public void dispose() {

        DisposableHelper.dispose(this);

    }

呃呃,只有一行代码,我们继续


public static boolean dispose(AtomicReference<Disposable> field) {

        Disposable current = field.get(); //此处得到上游的Disposable对象

        Disposable d = DISPOSED;

        if (current != d) {

            current = field.getAndSet(d); //更改自己的标志位为DISPOSED

            if (current != d) {

                if (current != null) {

                    current.dispose();//关闭上游的Disposable对象

                }

                return true;

            }

        }

        return false;

    }

可以看到,这里只做了两件事,一是更改自己的标志位,二是调用上游的dispose()方法,其实你只要多看看,你就发现,RxJava内部大多数Observer在dispose()方法都会干这两件事。

到这,我们该如何解决这个内存泄漏问题呢?其实,RxJava早就想到了这一点,它给我们提供了一个onTerminateDetach()操作符,这个操作符会在onError(Throwable t)onComplete()dispose()这个3个时刻,断开上游对下游的引用,我们来看看源码,源码在ObservableDetach类中


@Override

public void dispose() {

    Disposable d = this.upstream;

    this.upstream = EmptyComponent.INSTANCE;//上游重新赋值

    this.downstream = EmptyComponent.asObserver();//下游重新赋值

    d.dispose();//调用上游的dispose()方法

}

@Override

public void onError(Throwable t) {

    Observer<? super T> a = downstream;

    this.upstream = EmptyComponent.INSTANCE;//上游重新赋值

    this.downstream = EmptyComponent.asObserver();//下游重新赋值

    a.onError(t); //调用下游的onError方法

}

@Override

public void onComplete() {

    Observer<? super T> a = downstream;

    this.upstream = EmptyComponent.INSTANCE;//上游重新赋值

    this.downstream = EmptyComponent.asObserver();//下游重新赋值

    a.onComplete();//调用下游的onComplete方法

}

到这,我们就知道该怎么做了,下面这样写就安全了


Observable.timer(100, TimeUnit.MILLISECONDS)

        .map(new MyFunction<>())//阻塞操作

        .onTerminateDetach() //管道断开时,中断上游对下游的引用

        .lift(RxLife.lift(this)) //默认在onDestroy时断开管道

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

可是,每次都要这样写吗?有没有更简单的,有,RxLife提供了RxLife.compose(LifecycleOwner)方法,内部就是将onTerminateDetachlift这两个操作符整合在了一起,接下来,看看如何使用


Observable.timer(100, TimeUnit.MILLISECONDS)

        .map(new MyFunction<>())//阻塞操作

        //注意这里使用compose操作符

        .compose(RxLife.compose(this))//默认在onDestroy时中断管道,并中断下下游之间的引用

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

如果需要指定生命周期的方法,也可以


Observable.timer(100, TimeUnit.MILLISECONDS)

        .map(new MyFunction<>())//阻塞操作

        //注意这里使用compose操作符

        .compose(RxLife.compose(this, Event.ON_STOP))//指定在onStop时断开管道

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

    }

大多数情况下,我们希望观察者能主线程进行回调,也许你会这样写


Observable.timer(100, TimeUnit.MILLISECONDS)

        .map(new MyFunction<>())//阻塞操作

        .observeOn(AndroidSchedulers.mainThread()) //在主线程回调

        .compose(RxLife.compose(this, Event.ON_STOP))//指定在onStop回调时中断管道,并中断上下游引用

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

如果你是用RxLife的话,就可以这样写,使用RxLife.composeOnMain方法


Observable.timer(100, TimeUnit.MILLISECONDS)

        .map(new MyFunction<>())//阻塞操作

        //在主线程进程回调,在onStop回调时中断管道,并中断上下游引用

        .compose(RxLife.composeOnMain(this, Event.ON_STOP))

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

RxLife类就只有6个静态方法,如下

shshs

注意,前方高能预警!!!!!!!

结合RxLife使用Observable的liftcompose操作符时,下游除了subscribe操作符外最好不要有其它的操作符,前面讲过,当调用Disposable.dispose()时,它会往上一层一层的调用上游的dispose()方法,如果下游有Disposable对象,是调用不到的,如果此时下游有自己的事件需要发送,那么就无法拦截了。

如:


Observable.just(1)

        .compose(RxLife.compose(this))

        .flatMap((Function<Integer, ObservableSource<Long>>) integer -> {

            //每隔一秒发送一个数据,共10个

            return Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS);

        })

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

这样,即使Activity关闭了,观察者每隔一秒后,依然能收到来自上游的事件,因为compose无法切断下游的管道,我们改一下上面的代码


Observable.just(1)

        .flatMap((Function<Integer, ObservableSource<Long>>) integer -> {

            //每隔一秒发送一个数据,共10个

            return Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS);

        })

        .compose(RxLife.compose(this))

        .subscribe(aLong -> {

            Log.e("LJX", "accept =" + aLong);

        });

这样ok了,其实这不是RxLife的问题,使用鼎鼎大名的trello/RxLifecycle库也是一样的,因为RxJava的设计就是如此,上游拿不到下游的Disposable对象,所以,我们在使用RxLife时,一定要注意在lift或者compose操作符的下游,除了subscribe操作符外最好不要有其它的操作符,这一点一定需要注意。

RxLife最新版本已经使用as操作符规避这个问题,详情查看Android RxLife 一款轻量级别的RxJava生命周期管理库(二)

小彩蛋

RxLife类里面的life、compose系列方法,皆适用于Flowable、Observable、Single、Maybe、Completable这5个被观察者对象,道理都一样,这里不在一一讲解。

结尾

Ok,RxLife的使用基本就介绍完了,到这我们会发现,使用RxLife库,我们只需要关注一个类即可,那即是RxLife类,api简单功能却强大。敢兴趣的同学,可以去阅读RxLife源码,有疑问,请留言,我会在第一时间作答。

扩展

RxLife结合HttpSender发送请求,简直不要太爽。

HttpSender详情请点击HttpSender OkHttp+RxJava超好用、功能超级强大的Http请求框架

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343