Android面试Android进阶(十九)-RxJava相关问题-未分析完,先跳过

在看RxJava源码的过程中发生了以下的事情


艹.gif

头昏脑涨,怀疑人生,生无可恋还有啥欢迎补充!!
国际惯例,抛出一些问题先

问:说说你对RxJava的理解

答:RxJava是一个基于观察者模式设计的开源库,它的强大地方主要在于链式调用的api、操作符以及线程切换,在异步任务中,不再需要那种callBack方式的无限嵌套。
面试官:

image.png

你:大佬,等等,看看我自己撸的MyRxJava先吧,你这问题我不知道该怎么回答。传送门

首先要知道什么是观察者模式:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知。 例如:微信公众号,一群人关注一个公众号,当公众号内容有更新的时候就会发出通知,关注者就能知道内容更新了,这里面人就是观察者,公众号就是被观察者,被观察者数据发生改变时,观察者能得到通知。

既然要自己撸一个RxJava,那先看看RxJava如何使用的:

        Observable.create(object : ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("真实观察者在发送东西啦")
                emitter.onComplete()
            }
        }).subscribe(object : Observer<String>{
            override fun onComplete() {

            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
            }

            override fun onError(e: Throwable) {
            }
        })

通过上面,我们知道RxJava中的观察者模式两个角色、一个动作、四个事件:
1、Observable:被观察者
2、Observer:观察者
3、Subscribe:订阅
4、Event:被观察者通知观察者的事件(onNext()、onError()、onComplete()、onSubscribe())

一、新建一个Observable、Observer两个类

/**
 * 被观察者
 * 开发者在被观察者发送数据通知观察者时,其实也并不知道要发送什么数据,所以理应也是应该是一个接口,交给开发者自己处理。
 * 但是在RxJava中,ObservableOnSubscribe才是真正的被观察者,所以实际上被观察者为ObservableOnSubscribe
 */
class Observable {
}

/**
 * 观察者
 * 在开发者使用时,这个观察者收到被观察者通知时并不知道要做什么事情,所以这里是一个接口
 */
interface Observer {
}

/**
 * 真实被观察者
 */
interface ObservableOnSubscribe{
}

实际上,RxJava并不知道观察者收到被观察者数据更新通知时要做什么,所以设计成了接口,让开发者自己去实现。同样的被观察者也一样,实际上RxJava真正的被观察者为:ObservableOnSubscribe,意为订阅时的被观察者,所以这个才是真正的被观察者。

二、主要的类有了,想想RxJava是怎么使用的,首先先调用create静态方法传入真实被观察者的实现返回被观察者,之后调用subscribe方法传入观察者的实现(被观察者和观察者建立联系的步骤)。同时,我们知道真实被观察者及观察者的之间要传递的数据类型都不确定,所以需要使用泛型。还有被观察者通知观察者的四种事件。修改一下代码:

//1、修改真实被观察者支持泛型
interface ObservableOnSubscribe<T>{
}

class Observable {
    companion object{
        //3、被观察者中新建静态方法create,接收真实被观察者,返回被观察者
        fun <T> create(source: ObservableOnSubscribe<T>): Observable {
            return Observable()
        }
    }
    //4、被观察者和观察者建立联系
    fun <T> subscribe(observer: Observer<T>) {

    }
}

//2、观察者增加四种事件类型,支持泛型
interface Observer<T> {
    fun onNext(value:T)
    fun onComplete()
    fun onError(e:Throwable)
    fun onSubscribe()
}

三、到这里,真实被观察者目前还只是一个接口类,开发者不知道如何发送数据,RxJava中将发送数据交给了发射器ObservableEmitter:Emitter,并且提供一个待开发者实现的方法,在方法中利用发射器进行数据发送。

//真实被观察者添加一个待实现方法,开发者实现后可以使用发射器来发送数据
interface ObservableOnSubscribe<T> {
    //实现这个方法后,用发射器来发射数据
    fun subscribe(emitter:ObservableEmitter<T>)
}

//被观察者发射器
interface ObservableEmitter<T> : Emitter<T> {
    //实际上这里还有很多的其他方法,如是否可以取消,是否处理了等等,这里不做分析了
}
//发射器
interface Emitter<T> {
    fun onNext(value:T)
    fun onComplete()
    fun onError(e:Throwable)
}

来,我们试试使用:

        Observable.create(object : ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("开始尝试发射数据了")
                emitter.onComplete()
            }
        }).subscribe(object : Observer<String>{
            override fun onNext(value: String) {
                Log.e("XYX",value)
            }

            override fun onComplete() {
                //RxJava中如果调用了onComplete或onError方法之后,再发送数据就无效
                Log.e("XYX", "完成了")
            }

            override fun onError(e: Throwable) {
                Log.e("XYX", "出错了")
            }

            override fun onSubscribe() {
                Log.e("XYX", "订阅了")
            }
        })

什么?啥也没打印?
可不么,真实被观察者利用发射器调用onNext()方法发送数据,往哪里发送没告诉他,执行了也不报错,但是程序也不知道怎么处理啊。还有被观察者subscribe方法只有方法,没有方法体,程序也不知道如何做关联订阅啊~

四、接下来还要实现发射器往观察者中发射数据,怎么发?当然是直接调用方法发了,怎么调用方法?拿到观察者的引用不就行了?
发射器是一个接口,新建一个发射器的实现类,接收观察者对象,并实现发射器的三个方法:

class CreateEmitter<T>(val observer: Observer<T>) : ObservableEmitter<T> {
    override fun onNext(value: T) {
        observer.onNext(value)
    }

    override fun onComplete() {
        observer.onComplete()
    }

    override fun onError(e: Throwable) {
        observer.onError(e)
    }
}

class Observable {
    
    companion object{
        fun <T> create(source: ObservableOnSubscribe<T>): Observable {
            return Observable()
        }
    }
    
    fun <T> subscribe(observer: Observer<T>) {
        val emitter: CreateEmitter<T> = CreateEmitter(observer)
    }
}

这里就将观察者和发射器关联,但是发射器还没有和真实被观察者关联,也应该进行关联,要不然被观察中就无法将数据发送给观察者了,怎么关联?没错,create方法传入了一个真实被观察者对象,就它了。


image.png

想想,是先create方法创建一个被观察者,之后才执行的订阅,就算把发射器抽成全局,create方法的发射器还没初始化,且发射器的初始化需要关联观察者,怎么办?简单,把这个create传进来的真实被观察者存起来就行了。这里通过修改Observable构造方法的方式来进行修改:

class Observable<T>(private val source: ObservableOnSubscribe<T>) {

    companion object{
        fun <T> create(source: ObservableOnSubscribe<T>): Observable<T> {
            return Observable(source)
        }
    }

    fun subscribe(observer: Observer<T>) {
        //订阅成功就应该先执行订阅方法
        observer.onSubscribe()
        //将发射器与观察者关联 
        val emitter: CreateEmitter<T> = CreateEmitter(observer)
        //将真实被观察者发射器关联
        source.subscribe(emitter)
    }
}

这不,手撸一个RxJava就完成了!
什么?完成了?不信你看看:

image.png

其实,这里只是实现了RxJava中一点点点点,实现了一个观察者模式。
总结一下:
1、使用被观察者的静态方法create创建一个真正的被观察者对象,然后设置到被观察者的source对象中
2、调用被观察者的subscribe方法传入观察者对象实现观察者与被观察者的订阅关系,并立马调用观察者的onSubscribe方法通知订阅成功,然后创建发射器传入观察者,使观察者和发射器关联。
3、真正的被观察者subscribe方法中通过发射器调用观察者的方法发送数据。
4、观察者收到数据
image.png

RxJava中还有操作符,既然都到这了,要不就再实现一个操作符来玩玩?说干就要干,裤子都脱了,不干是傻瓜!
RxJava中有各种各样的操作符,例如:map、just、range等等,具体可以搜索一下,如果你比较懒,那就点击这里RxJava操作符
我们实现一个操作符,就挑个常用的map吧。

实现一个map操作符

RxJava的map操作符有转换的功能,实际上就是将被观察者的数据换了,返回一个被观察者后继续执行,所以这里应该是在create调用之后的方法,而且返回的是新的数据类型。

    fun <R> map():Observable<R>{
        //返回的新的观察者要起到承上启下的作用
    }

新的真实观察者就叫ObservableMap吧,RxJava中有各种各样Observablexxx,就是使用装饰者模式来设计的(在不改变原有对象的基础之上,将功能附加到对象上。提供了比继承更有弹性的替代方案、扩展原有对象功能),这个类要做的就是获取上级的被观察者,经过自身转化后再与下级观察者实现订阅关系即:要有一个观察者观察上级被观察者,同时是有一个被观察者,能被下级观察者观察
新建ObservableMap:

/**
 * 创建一个用于被下级观察的被观察者
 * 接收上级真实被观察者,以及数据变换规则的高阶函数
 * 这里要两个泛型,一个是原始数据T,一个是变换后的数据R
 * 实现真实的被观察
 */
class ObservableMap<T, R>(private val source:ObservableOnSubscribe<T>, private val func:((T)->R)):ObservableOnSubscribe<R>{
}

上面说了 map方法是要返回一个被观察者,起到承上启下的作用
修改一下map方法

    fun <R> map(func:(T)->R):Observable<R>{
        //实例化一个新的被观察者,要起到承上启下的作用,所以将真实的被观察者传入。
        // 在新被观察者中需要做的事情就是事件转化,所以将开发者自己实现的转换规则也传入
        val map=ObservableMap(this.source,func)
        return Observable(map)
    }

ObservableMap类中要实现观察上级被观察者数据,同时要被下级观察者观察,所以里面肯定也要有一个观察者,用于观察上级真实被观察者source,首先来创建一个观察者:

    class MapObserver<T,R>(private val emitter:ObservableEmitter<R>, private val func:((T)->R)):Observer<T>{

        override fun onNext(item: T) {
            //在发送数据前按照开发者设置的数据变换规则获取结果
            val result=func.invoke(item)
            //利用发射器发送最新的数据
            emitter.onNext(result)
        }

        override fun onError(e: Throwable) {
            emitter.onError(e)
        }

        override fun onComplete() {
            emitter.onComplete()
        }

        override fun onSubscribe() {

        }
    }

接下来想想 ObservableMap实现了ObservableOnSubscribe这个真实被观察者,subscribe方法中需要做哪些是:
1、source需要设置新的发射器
2、创建新的发射器要有一个观察者
3、创建新的观察者对象

    override fun subscribe(emitter: ObservableEmitter<R>) {
        //创建真实观察者,在真实观察者中接收原始数据后,按照高阶函数定义的修改规则获取最新的结果后重新发射数据
        val map=MapObserver(emitter, func)
        //用新的MapObserver重新调用onSubscribe
        map.onSubscribe()
        //创建新的发射器,用新的发射器来发送数据
        val createEmitter = CreateEmitter(map)
        //最终在新的被观察者的subscribe方法中,使用新的发射器进行发送数据
        source.subscribe(createEmitter)
    }

最终的代码如下:

    //-----------Observable类中增加map方法:
    //map操作符,变换数据后再发送
    fun <R> map(func:(T)->R):Observable<R>{
        //实例化一个新的被观察者,要起到承上启下的作用,所以将真实的被观察者传入。
        // 在新被观察者中需要做的事情就是事件转化,所以将开发者自己实现的转换规则也传入
        val map=ObservableMap(this.source,func)
        return Observable(map)
    }

    //新增ObservableMap类:

/**
 * 创建一个用于被下级观察的被观察者
 * 接收上级真实被观察者,以及数据变换规则的高阶函数
 * 这里要两个泛型,一个是原始数据T,一个是变换后的数据R
 * 实现真实的被观察
 */
class ObservableMap<T, R>(private val source:ObservableOnSubscribe<T>, private val func:((T)->R)):ObservableOnSubscribe<R>{

    override fun subscribe(emitter: ObservableEmitter<R>) {
        //创建真实观察者,在真实观察者中接收原始数据后,按照高阶函数定义的修改规则获取最新的结果后重新发射数据
        val map=MapObserver(emitter, func)
        //用新的MapObserver重新调用onSubscribe
        map.onSubscribe()
        //创建新的发射器,用新的发射器来发送数据
        val createEmitter = CreateEmitter(map)
        //最终在新的被观察者的subscribe方法中,使用新的发射器进行发送数据
        source.subscribe(createEmitter)
    }

    class MapObserver<T,R>(private val emitter:ObservableEmitter<R>, private val func:((T)->R)):Observer<T>{

        override fun onNext(item: T) {
            val result=func.invoke(item)
            emitter.onNext(result)
        }

        override fun onError(e: Throwable) {
            emitter.onError(e)
        }

        override fun onComplete() {
            emitter.onComplete()
        }

        override fun onSubscribe() {

        }
    }
   
}

//使用 
Observable.create(object:ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("开始尝试发射数据了")
                emitter.onComplete()
            }
        }).map {
            "经过map后的数据" + it
        }.subscribe(object : Observer<String>{
            override fun onNext(value: String) {
                Log.e("XYX", value)
            }

            override fun onComplete() {
                Log.e("XYX", "完成了")
            }

            override fun onError(e: Throwable) {
                Log.e("XYX", "出错了")
            }

            override fun onSubscribe() {
                Log.e("XYX", "订阅了")
            }
        })

操作符map就做完了,我们来看看结果:


image.png

//TODO 关于RxJava切换线程的内容,将来再补吧,RxJava真的是一个强大而复杂的开源框架,也不好理解。。。以上代码的传送门

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容